2 * Unix SMB/CIFS implementation.
3 * Samba internal messaging functions
4 * Copyright (C) 2013 by Volker Lendecke
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "util/util.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/select.h"
26 #include "lib/util/debug.h"
27 #include "lib/messages_dgm.h"
28 #include "lib/util/genrand.h"
29 #include "lib/util/dlinklist.h"
30 #include "lib/pthreadpool/pthreadpool_tevent.h"
31 #include "lib/util/msghdr.h"
32 #include "lib/util/iov_buf.h"
33 #include "lib/util/blocking.h"
34 #include "lib/util/tevent_unix.h"
36 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
40 * This will carry enough for a socket path
42 char buf
[sizeof(struct sockaddr_un
)];
46 * We can only have one tevent_fd per dgm_context and per
47 * tevent_context. Maintain a list of registered tevent_contexts per
50 struct messaging_dgm_fde_ev
{
51 struct messaging_dgm_fde_ev
*prev
, *next
;
54 * Backreference to enable DLIST_REMOVE from our
55 * destructor. Also, set to NULL when the dgm_context dies
56 * before the messaging_dgm_fde_ev.
58 struct messaging_dgm_context
*ctx
;
60 struct tevent_context
*ev
;
61 struct tevent_fd
*fde
;
64 struct messaging_dgm_out
{
65 struct messaging_dgm_out
*prev
, *next
;
66 struct messaging_dgm_context
*ctx
;
73 struct tevent_queue
*queue
;
74 struct tevent_timer
*idle_timer
;
77 struct messaging_dgm_in_msg
{
78 struct messaging_dgm_in_msg
*prev
, *next
;
79 struct messaging_dgm_context
*ctx
;
88 struct messaging_dgm_context
{
89 struct tevent_context
*ev
;
91 struct sun_path_buf socket_dir
;
92 struct sun_path_buf lockfile_dir
;
96 struct messaging_dgm_in_msg
*in_msgs
;
98 struct messaging_dgm_fde_ev
*fde_evs
;
99 void (*recv_cb
)(struct tevent_context
*ev
,
105 void *recv_cb_private_data
;
107 bool *have_dgm_context
;
109 struct pthreadpool_tevent
*pool
;
110 struct messaging_dgm_out
*outsocks
;
113 /* Set socket close on exec. */
114 static int prepare_socket_cloexec(int sock
)
119 flags
= fcntl(sock
, F_GETFD
, 0);
124 if (fcntl(sock
, F_SETFD
, flags
) == -1) {
131 static void close_fd_array(int *fds
, size_t num_fds
)
135 for (i
= 0; i
< num_fds
; i
++) {
146 * The idle handler can free the struct messaging_dgm_out *,
147 * if it's unused (qlen of zero) which closes the socket.
150 static void messaging_dgm_out_idle_handler(struct tevent_context
*ev
,
151 struct tevent_timer
*te
,
152 struct timeval current_time
,
155 struct messaging_dgm_out
*out
= talloc_get_type_abort(
156 private_data
, struct messaging_dgm_out
);
159 out
->idle_timer
= NULL
;
161 qlen
= tevent_queue_length(out
->queue
);
168 * Setup the idle handler to fire afer 1 second if the
172 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out
*out
)
176 qlen
= tevent_queue_length(out
->queue
);
178 TALLOC_FREE(out
->idle_timer
);
182 if (out
->idle_timer
!= NULL
) {
183 tevent_update_timer(out
->idle_timer
,
184 tevent_timeval_current_ofs(1, 0));
188 out
->idle_timer
= tevent_add_timer(
189 out
->ctx
->ev
, out
, tevent_timeval_current_ofs(1, 0),
190 messaging_dgm_out_idle_handler
, out
);
192 * No NULL check, we'll come back here. Worst case we're
197 static int messaging_dgm_out_destructor(struct messaging_dgm_out
*dst
);
198 static void messaging_dgm_out_idle_handler(struct tevent_context
*ev
,
199 struct tevent_timer
*te
,
200 struct timeval current_time
,
204 * Connect to an existing rendezvous point for another
205 * pid - wrapped inside a struct messaging_dgm_out *.
208 static int messaging_dgm_out_create(TALLOC_CTX
*mem_ctx
,
209 struct messaging_dgm_context
*ctx
,
210 pid_t pid
, struct messaging_dgm_out
**pout
)
212 struct messaging_dgm_out
*out
;
213 struct sockaddr_un addr
= { .sun_family
= AF_UNIX
};
216 char addr_buf
[sizeof(addr
.sun_path
) + (3 * sizeof(unsigned) + 2)];
218 out
= talloc(mem_ctx
, struct messaging_dgm_out
);
223 *out
= (struct messaging_dgm_out
) {
229 out_pathlen
= snprintf(addr_buf
, sizeof(addr_buf
),
230 "%s/%u", ctx
->socket_dir
.buf
, (unsigned)pid
);
231 if (out_pathlen
< 0) {
234 if ((size_t)out_pathlen
>= sizeof(addr
.sun_path
)) {
239 memcpy(addr
.sun_path
, addr_buf
, out_pathlen
+ 1);
241 out
->queue
= tevent_queue_create(out
, addr
.sun_path
);
242 if (out
->queue
== NULL
) {
247 out
->sock
= socket(AF_UNIX
, SOCK_DGRAM
, 0);
248 if (out
->sock
== -1) {
252 DLIST_ADD(ctx
->outsocks
, out
);
253 talloc_set_destructor(out
, messaging_dgm_out_destructor
);
256 ret
= connect(out
->sock
,
257 (const struct sockaddr
*)(const void *)&addr
,
259 } while ((ret
== -1) && (errno
== EINTR
));
265 ret
= set_blocking(out
->sock
, false);
269 out
->is_blocking
= false;
280 static int messaging_dgm_out_destructor(struct messaging_dgm_out
*out
)
282 DLIST_REMOVE(out
->ctx
->outsocks
, out
);
284 if ((tevent_queue_length(out
->queue
) != 0) &&
285 (getpid() == out
->ctx
->pid
)) {
287 * We have pending jobs. We can't close the socket,
288 * this has been handed over to messaging_dgm_out_queue_state.
293 if (out
->sock
!= -1) {
301 * Find the struct messaging_dgm_out * to talk to pid.
302 * If we don't have one, create it. Set the timer to
303 * delete after 1 sec.
306 static int messaging_dgm_out_get(struct messaging_dgm_context
*ctx
, pid_t pid
,
307 struct messaging_dgm_out
**pout
)
309 struct messaging_dgm_out
*out
;
312 for (out
= ctx
->outsocks
; out
!= NULL
; out
= out
->next
) {
313 if (out
->pid
== pid
) {
319 ret
= messaging_dgm_out_create(ctx
, ctx
, pid
, &out
);
326 * shouldn't be possible, should be set if messaging_dgm_out_create
327 * succeeded. This check is to satisfy static checker
332 messaging_dgm_out_rearm_idle_timer(out
);
339 * This function is called directly to send a message fragment
340 * when the outgoing queue is zero, and from a pthreadpool
341 * job thread when messages are being queued (qlen != 0).
342 * Make sure *ONLY* thread-safe functions are called within.
345 static ssize_t
messaging_dgm_sendmsg(int sock
,
346 const struct iovec
*iov
, int iovlen
,
347 const int *fds
, size_t num_fds
,
354 * Do the actual sendmsg syscall. This will be called from a
355 * pthreadpool helper thread, so be careful what you do here.
358 msg
= (struct msghdr
) {
359 .msg_iov
= discard_const_p(struct iovec
, iov
),
363 fdlen
= msghdr_prep_fds(&msg
, NULL
, 0, fds
, num_fds
);
372 msghdr_prep_fds(&msg
, buf
, fdlen
, fds
, num_fds
);
375 ret
= sendmsg(sock
, &msg
, 0);
376 } while ((ret
== -1) && (errno
== EINTR
));
385 struct messaging_dgm_out_queue_state
{
386 struct tevent_context
*ev
;
387 struct pthreadpool_tevent
*pool
;
389 struct tevent_req
*req
;
390 struct tevent_req
*subreq
;
401 static int messaging_dgm_out_queue_state_destructor(
402 struct messaging_dgm_out_queue_state
*state
);
403 static void messaging_dgm_out_queue_trigger(struct tevent_req
*req
,
405 static void messaging_dgm_out_threaded_job(void *private_data
);
406 static void messaging_dgm_out_queue_done(struct tevent_req
*subreq
);
409 * Push a message fragment onto a queue to be sent by a
410 * threadpool job. Makes copies of data/fd's to be sent.
411 * The running tevent_queue internally creates an immediate
412 * event to schedule the write.
415 static struct tevent_req
*messaging_dgm_out_queue_send(
416 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
417 struct messaging_dgm_out
*out
,
418 const struct iovec
*iov
, int iovlen
, const int *fds
, size_t num_fds
)
420 struct tevent_req
*req
;
421 struct messaging_dgm_out_queue_state
*state
;
422 struct tevent_queue_entry
*e
;
426 req
= tevent_req_create(out
, &state
,
427 struct messaging_dgm_out_queue_state
);
432 state
->pool
= out
->ctx
->pool
;
433 state
->sock
= out
->sock
;
437 * Go blocking in a thread
439 if (!out
->is_blocking
) {
440 int ret
= set_blocking(out
->sock
, true);
442 tevent_req_error(req
, errno
);
443 return tevent_req_post(req
, ev
);
445 out
->is_blocking
= true;
448 buflen
= iov_buflen(iov
, iovlen
);
450 tevent_req_error(req
, EMSGSIZE
);
451 return tevent_req_post(req
, ev
);
454 state
->buf
= talloc_array(state
, uint8_t, buflen
);
455 if (tevent_req_nomem(state
->buf
, req
)) {
456 return tevent_req_post(req
, ev
);
458 iov_buf(iov
, iovlen
, state
->buf
, buflen
);
460 state
->fds
= talloc_array(state
, int, num_fds
);
461 if (tevent_req_nomem(state
->fds
, req
)) {
462 return tevent_req_post(req
, ev
);
465 for (i
=0; i
<num_fds
; i
++) {
469 for (i
=0; i
<num_fds
; i
++) {
471 state
->fds
[i
] = dup(fds
[i
]);
473 if (state
->fds
[i
] == -1) {
476 close_fd_array(state
->fds
, num_fds
);
478 tevent_req_error(req
, ret
);
479 return tevent_req_post(req
, ev
);
483 talloc_set_destructor(state
, messaging_dgm_out_queue_state_destructor
);
485 e
= tevent_queue_add_entry(out
->queue
, ev
, req
,
486 messaging_dgm_out_queue_trigger
, req
);
487 if (tevent_req_nomem(e
, req
)) {
488 return tevent_req_post(req
, ev
);
493 static int messaging_dgm_out_queue_state_destructor(
494 struct messaging_dgm_out_queue_state
*state
)
499 if (state
->subreq
!= NULL
) {
501 * We're scheduled, but we're destroyed. This happens
502 * if the messaging_dgm_context is destroyed while
503 * we're stuck in a blocking send. There's nothing we
504 * can do but to leak memory.
506 TALLOC_FREE(state
->subreq
);
507 (void)talloc_reparent(state
->req
, NULL
, state
);
512 num_fds
= talloc_array_length(fds
);
513 close_fd_array(fds
, num_fds
);
518 * tevent_queue callback that schedules the pthreadpool to actually
519 * send the queued message fragment.
522 static void messaging_dgm_out_queue_trigger(struct tevent_req
*req
,
525 struct messaging_dgm_out_queue_state
*state
= tevent_req_data(
526 req
, struct messaging_dgm_out_queue_state
);
528 tevent_req_reset_endtime(req
);
530 state
->subreq
= pthreadpool_tevent_job_send(
531 state
, state
->ev
, state
->pool
,
532 messaging_dgm_out_threaded_job
, state
);
533 if (tevent_req_nomem(state
->subreq
, req
)) {
536 tevent_req_set_callback(state
->subreq
, messaging_dgm_out_queue_done
,
541 * Wrapper function run by the pthread that calls
542 * messaging_dgm_sendmsg() to actually do the sendmsg().
545 static void messaging_dgm_out_threaded_job(void *private_data
)
547 struct messaging_dgm_out_queue_state
*state
= talloc_get_type_abort(
548 private_data
, struct messaging_dgm_out_queue_state
);
550 struct iovec iov
= { .iov_base
= state
->buf
,
551 .iov_len
= talloc_get_size(state
->buf
) };
552 size_t num_fds
= talloc_array_length(state
->fds
);
558 state
->sent
= messaging_dgm_sendmsg(state
->sock
, &iov
, 1,
559 state
->fds
, num_fds
, &state
->err
);
561 if (state
->sent
!= -1) {
564 if (state
->err
!= ENOBUFS
) {
569 * ENOBUFS is the FreeBSD way of saying "Try
570 * again". We have to do polling.
573 ret
= poll(NULL
, 0, msec
);
574 } while ((ret
== -1) && (errno
== EINTR
));
577 * Exponential backoff up to once a second
580 msec
= MIN(msec
, 1000);
585 * Pickup the results of the pthread sendmsg().
588 static void messaging_dgm_out_queue_done(struct tevent_req
*subreq
)
590 struct tevent_req
*req
= tevent_req_callback_data(
591 subreq
, struct tevent_req
);
592 struct messaging_dgm_out_queue_state
*state
= tevent_req_data(
593 req
, struct messaging_dgm_out_queue_state
);
596 if (subreq
!= state
->subreq
) {
600 ret
= pthreadpool_tevent_job_recv(subreq
);
603 state
->subreq
= NULL
;
605 if (tevent_req_error(req
, ret
)) {
608 if (state
->sent
== -1) {
609 tevent_req_error(req
, state
->err
);
612 tevent_req_done(req
);
615 static int messaging_dgm_out_queue_recv(struct tevent_req
*req
)
617 return tevent_req_simple_recv_unix(req
);
620 static void messaging_dgm_out_sent_fragment(struct tevent_req
*req
);
623 * Core function to send a message fragment given a
624 * connected struct messaging_dgm_out * destination.
625 * If no current queue tries to send nonblocking
626 * directly. If not, queues the fragment (which makes
627 * a copy of it) and adds a 60-second timeout on the send.
630 static int messaging_dgm_out_send_fragment(
631 struct tevent_context
*ev
, struct messaging_dgm_out
*out
,
632 const struct iovec
*iov
, int iovlen
, const int *fds
, size_t num_fds
)
634 struct tevent_req
*req
;
638 qlen
= tevent_queue_length(out
->queue
);
643 if (out
->is_blocking
) {
644 int ret
= set_blocking(out
->sock
, false);
648 out
->is_blocking
= false;
651 nsent
= messaging_dgm_sendmsg(out
->sock
, iov
, iovlen
, fds
,
657 if (err
== ENOBUFS
) {
659 * FreeBSD's way of telling us the dst socket
660 * is full. EWOULDBLOCK makes us spawn a
661 * polling helper thread.
666 if (err
!= EWOULDBLOCK
) {
671 req
= messaging_dgm_out_queue_send(out
, ev
, out
, iov
, iovlen
,
676 tevent_req_set_callback(req
, messaging_dgm_out_sent_fragment
, out
);
678 ok
= tevent_req_set_endtime(req
, ev
,
679 tevent_timeval_current_ofs(60, 0));
689 * Pickup the result of the fragment send. Reset idle timer
693 static void messaging_dgm_out_sent_fragment(struct tevent_req
*req
)
695 struct messaging_dgm_out
*out
= tevent_req_callback_data(
696 req
, struct messaging_dgm_out
);
699 ret
= messaging_dgm_out_queue_recv(req
);
703 DBG_WARNING("messaging_out_queue_recv returned %s\n",
707 messaging_dgm_out_rearm_idle_timer(out
);
711 struct messaging_dgm_fragment_hdr
{
718 * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
719 * size chunks and send it.
721 * Message fragments are prefixed by a 64-bit cookie that
722 * stays the same for all fragments. This allows the receiver
723 * to recognise fragments of the same message and re-assemble
724 * them on the other end.
726 * Note that this allows other message fragments from other
727 * senders to be interleaved in the receive read processing,
728 * the combination of the cookie and header info allows unique
729 * identification of the message from a specific sender in
732 * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
733 * then send a single message with cookie set to zero.
735 * Otherwise the message is fragmented into chunks and added
736 * to the sending queue. Any file descriptors are passed only
737 * in the last fragment.
739 * Finally the cookie is incremented (wrap over zero) to
740 * prepare for the next message sent to this channel.
744 static int messaging_dgm_out_send_fragmented(struct tevent_context
*ev
,
745 struct messaging_dgm_out
*out
,
746 const struct iovec
*iov
,
748 const int *fds
, size_t num_fds
)
750 ssize_t msglen
, sent
;
752 struct iovec iov_copy
[iovlen
+2];
753 struct messaging_dgm_fragment_hdr hdr
;
754 struct iovec src_iov
;
760 msglen
= iov_buflen(iov
, iovlen
);
764 if (num_fds
> INT8_MAX
) {
768 if ((size_t) msglen
<=
769 (MESSAGING_DGM_FRAGMENT_LENGTH
- sizeof(uint64_t))) {
772 iov_copy
[0].iov_base
= &cookie
;
773 iov_copy
[0].iov_len
= sizeof(cookie
);
775 memcpy(&iov_copy
[1], iov
,
776 sizeof(struct iovec
) * iovlen
);
779 return messaging_dgm_out_send_fragment(
780 ev
, out
, iov_copy
, iovlen
+1, fds
, num_fds
);
784 hdr
= (struct messaging_dgm_fragment_hdr
) {
790 iov_copy
[0].iov_base
= &out
->cookie
;
791 iov_copy
[0].iov_len
= sizeof(out
->cookie
);
792 iov_copy
[1].iov_base
= &hdr
;
793 iov_copy
[1].iov_len
= sizeof(hdr
);
799 * The following write loop sends the user message in pieces. We have
800 * filled the first two iovecs above with "cookie" and "hdr". In the
801 * following loops we pull message chunks from the user iov array and
802 * fill iov_copy piece by piece, possibly truncating chunks from the
803 * caller's iov array. Ugly, but hopefully efficient.
806 while (sent
< msglen
) {
808 size_t iov_index
= 2;
810 fragment_len
= sizeof(out
->cookie
) + sizeof(hdr
);
812 while (fragment_len
< MESSAGING_DGM_FRAGMENT_LENGTH
) {
815 space
= MESSAGING_DGM_FRAGMENT_LENGTH
- fragment_len
;
816 chunk
= MIN(space
, src_iov
.iov_len
);
818 iov_copy
[iov_index
].iov_base
= src_iov
.iov_base
;
819 iov_copy
[iov_index
].iov_len
= chunk
;
822 src_iov
.iov_base
= (char *)src_iov
.iov_base
+ chunk
;
823 src_iov
.iov_len
-= chunk
;
824 fragment_len
+= chunk
;
826 if (src_iov
.iov_len
== 0) {
835 sent
+= (fragment_len
- sizeof(out
->cookie
) - sizeof(hdr
));
838 * only the last fragment should pass the fd array.
839 * That simplifies the receiver a lot.
842 ret
= messaging_dgm_out_send_fragment(
843 ev
, out
, iov_copy
, iov_index
, NULL
, 0);
845 ret
= messaging_dgm_out_send_fragment(
846 ev
, out
, iov_copy
, iov_index
, fds
, num_fds
);
854 if (out
->cookie
== 0) {
861 static struct messaging_dgm_context
*global_dgm_context
;
863 static int messaging_dgm_context_destructor(struct messaging_dgm_context
*c
);
865 static int messaging_dgm_lockfile_create(struct messaging_dgm_context
*ctx
,
866 pid_t pid
, int *plockfile_fd
,
871 struct sun_path_buf lockfile_name
;
877 ret
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
),
878 "%s/%u", ctx
->lockfile_dir
.buf
, (unsigned)pid
);
882 if ((unsigned)ret
>= sizeof(lockfile_name
.buf
)) {
886 /* no O_EXCL, existence check is via the fcntl lock */
888 lockfile_fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_CREAT
|O_RDWR
,
891 if ((lockfile_fd
== -1) &&
892 ((errno
== ENXIO
) /* Linux */ ||
893 (errno
== ENODEV
) /* Linux kernel bug */ ||
894 (errno
== EOPNOTSUPP
) /* FreeBSD */)) {
896 * Huh -- a socket? This might be a stale socket from
897 * an upgrade of Samba. Just unlink and retry, nobody
898 * else is supposed to be here at this time.
900 * Yes, this is racy, but I don't see a way to deal
901 * with this properly.
903 unlink(lockfile_name
.buf
);
905 lockfile_fd
= open(lockfile_name
.buf
,
906 O_NONBLOCK
|O_CREAT
|O_WRONLY
,
910 if (lockfile_fd
== -1) {
912 DEBUG(1, ("%s: open failed: %s\n", __func__
, strerror(errno
)));
916 lck
= (struct flock
) {
921 ret
= fcntl(lockfile_fd
, F_SETLK
, &lck
);
924 DEBUG(1, ("%s: fcntl failed: %s\n", __func__
, strerror(ret
)));
929 * Directly using the binary value for
930 * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
931 * violation. But including all of ndr here just for this
932 * seems to be a bit overkill to me. Also, messages_dgm might
933 * be replaced sooner or later by something streams-based,
934 * where unique_id generation will be handled differently.
938 generate_random_buffer((uint8_t *)&unique
, sizeof(unique
));
939 } while (unique
== UINT64_C(0xFFFFFFFFFFFFFFFF));
941 unique_len
= snprintf(buf
, sizeof(buf
), "%ju\n", (uintmax_t)unique
);
943 /* shorten a potentially preexisting file */
945 ret
= ftruncate(lockfile_fd
, unique_len
);
948 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__
,
953 written
= write(lockfile_fd
, buf
, unique_len
);
954 if (written
!= unique_len
) {
956 DEBUG(1, ("%s: write failed: %s\n", __func__
, strerror(ret
)));
960 *plockfile_fd
= lockfile_fd
;
965 unlink(lockfile_name
.buf
);
971 static void messaging_dgm_read_handler(struct tevent_context
*ev
,
972 struct tevent_fd
*fde
,
977 * Create the rendezvous point in the file system
978 * that other processes can use to send messages to
982 int messaging_dgm_init(struct tevent_context
*ev
,
984 const char *socket_dir
,
985 const char *lockfile_dir
,
986 void (*recv_cb
)(struct tevent_context
*ev
,
992 void *recv_cb_private_data
)
994 struct messaging_dgm_context
*ctx
;
996 struct sockaddr_un socket_address
;
998 static bool have_dgm_context
= false;
1000 if (have_dgm_context
) {
1004 ctx
= talloc_zero(NULL
, struct messaging_dgm_context
);
1009 ctx
->pid
= getpid();
1010 ctx
->recv_cb
= recv_cb
;
1011 ctx
->recv_cb_private_data
= recv_cb_private_data
;
1013 len
= strlcpy(ctx
->lockfile_dir
.buf
, lockfile_dir
,
1014 sizeof(ctx
->lockfile_dir
.buf
));
1015 if (len
>= sizeof(ctx
->lockfile_dir
.buf
)) {
1017 return ENAMETOOLONG
;
1020 len
= strlcpy(ctx
->socket_dir
.buf
, socket_dir
,
1021 sizeof(ctx
->socket_dir
.buf
));
1022 if (len
>= sizeof(ctx
->socket_dir
.buf
)) {
1024 return ENAMETOOLONG
;
1027 socket_address
= (struct sockaddr_un
) { .sun_family
= AF_UNIX
};
1028 len
= snprintf(socket_address
.sun_path
,
1029 sizeof(socket_address
.sun_path
),
1030 "%s/%u", socket_dir
, (unsigned)ctx
->pid
);
1031 if (len
>= sizeof(socket_address
.sun_path
)) {
1033 return ENAMETOOLONG
;
1036 ret
= messaging_dgm_lockfile_create(ctx
, ctx
->pid
, &ctx
->lockfile_fd
,
1039 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1040 __func__
, strerror(ret
)));
1045 unlink(socket_address
.sun_path
);
1047 ctx
->sock
= socket(AF_UNIX
, SOCK_DGRAM
, 0);
1048 if (ctx
->sock
== -1) {
1050 DBG_WARNING("socket failed: %s\n", strerror(ret
));
1055 ret
= prepare_socket_cloexec(ctx
->sock
);
1058 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1064 ret
= bind(ctx
->sock
, (struct sockaddr
*)(void *)&socket_address
,
1065 sizeof(socket_address
));
1068 DBG_WARNING("bind failed: %s\n", strerror(ret
));
1073 talloc_set_destructor(ctx
, messaging_dgm_context_destructor
);
1075 ctx
->have_dgm_context
= &have_dgm_context
;
1077 ret
= pthreadpool_tevent_init(ctx
, UINT_MAX
, &ctx
->pool
);
1079 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1085 global_dgm_context
= ctx
;
1094 * Remove the rendezvous point in the filesystem
1095 * if we're the owner.
1098 static int messaging_dgm_context_destructor(struct messaging_dgm_context
*c
)
1100 while (c
->outsocks
!= NULL
) {
1101 TALLOC_FREE(c
->outsocks
);
1103 while (c
->in_msgs
!= NULL
) {
1104 TALLOC_FREE(c
->in_msgs
);
1106 while (c
->fde_evs
!= NULL
) {
1107 tevent_fd_set_flags(c
->fde_evs
->fde
, 0);
1108 c
->fde_evs
->ctx
= NULL
;
1109 DLIST_REMOVE(c
->fde_evs
, c
->fde_evs
);
1114 if (getpid() == c
->pid
) {
1115 struct sun_path_buf name
;
1118 ret
= snprintf(name
.buf
, sizeof(name
.buf
), "%s/%u",
1119 c
->socket_dir
.buf
, (unsigned)c
->pid
);
1120 if ((ret
< 0) || ((size_t)ret
>= sizeof(name
.buf
))) {
1122 * We've checked the length when creating, so this
1123 * should never happen
1129 ret
= snprintf(name
.buf
, sizeof(name
.buf
), "%s/%u",
1130 c
->lockfile_dir
.buf
, (unsigned)c
->pid
);
1131 if ((ret
< 0) || ((size_t)ret
>= sizeof(name
.buf
))) {
1133 * We've checked the length when creating, so this
1134 * should never happen
1140 close(c
->lockfile_fd
);
1142 if (c
->have_dgm_context
!= NULL
) {
1143 *c
->have_dgm_context
= false;
1149 static void messaging_dgm_validate(struct messaging_dgm_context
*ctx
)
1152 pid_t pid
= getpid();
1153 struct sockaddr_storage addr
;
1154 socklen_t addrlen
= sizeof(addr
);
1155 struct sockaddr_un
*un_addr
;
1156 struct sun_path_buf pathbuf
;
1157 struct stat st1
, st2
;
1161 * Protect against using the wrong messaging context after a
1162 * fork without reinit_after_fork.
1165 ret
= getsockname(ctx
->sock
, (struct sockaddr
*)&addr
, &addrlen
);
1167 DBG_ERR("getsockname failed: %s\n", strerror(errno
));
1170 if (addr
.ss_family
!= AF_UNIX
) {
1171 DBG_ERR("getsockname returned family %d\n",
1172 (int)addr
.ss_family
);
1175 un_addr
= (struct sockaddr_un
*)&addr
;
1177 ret
= snprintf(pathbuf
.buf
, sizeof(pathbuf
.buf
),
1178 "%s/%u", ctx
->socket_dir
.buf
, (unsigned)pid
);
1180 DBG_ERR("snprintf failed: %s\n", strerror(errno
));
1183 if ((size_t)ret
>= sizeof(pathbuf
.buf
)) {
1184 DBG_ERR("snprintf returned %d chars\n", (int)ret
);
1188 if (strcmp(pathbuf
.buf
, un_addr
->sun_path
) != 0) {
1189 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1190 pathbuf
.buf
, un_addr
->sun_path
);
1194 ret
= snprintf(pathbuf
.buf
, sizeof(pathbuf
.buf
),
1195 "%s/%u", ctx
->lockfile_dir
.buf
, (unsigned)pid
);
1197 DBG_ERR("snprintf failed: %s\n", strerror(errno
));
1200 if ((size_t)ret
>= sizeof(pathbuf
.buf
)) {
1201 DBG_ERR("snprintf returned %d chars\n", (int)ret
);
1205 ret
= stat(pathbuf
.buf
, &st1
);
1207 DBG_ERR("stat failed: %s\n", strerror(errno
));
1210 ret
= fstat(ctx
->lockfile_fd
, &st2
);
1212 DBG_ERR("fstat failed: %s\n", strerror(errno
));
1216 if ((st1
.st_dev
!= st2
.st_dev
) || (st1
.st_ino
!= st2
.st_ino
)) {
1217 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1218 (int)st2
.st_dev
, (int)st2
.st_ino
,
1219 (int)st1
.st_dev
, (int)st1
.st_ino
);
1231 static void messaging_dgm_recv(struct messaging_dgm_context
*ctx
,
1232 struct tevent_context
*ev
,
1233 uint8_t *msg
, size_t msg_len
,
1234 int *fds
, size_t num_fds
);
1237 * Raw read callback handler - passes to messaging_dgm_recv()
1238 * for fragment reassembly processing.
1241 static void messaging_dgm_read_handler(struct tevent_context
*ev
,
1242 struct tevent_fd
*fde
,
1246 struct messaging_dgm_context
*ctx
= talloc_get_type_abort(
1247 private_data
, struct messaging_dgm_context
);
1251 size_t msgbufsize
= msghdr_prep_recv_fds(NULL
, NULL
, 0, INT8_MAX
);
1252 uint8_t msgbuf
[msgbufsize
];
1253 uint8_t buf
[MESSAGING_DGM_FRAGMENT_LENGTH
];
1256 messaging_dgm_validate(ctx
);
1258 if ((flags
& TEVENT_FD_READ
) == 0) {
1262 iov
= (struct iovec
) { .iov_base
= buf
, .iov_len
= sizeof(buf
) };
1263 msg
= (struct msghdr
) { .msg_iov
= &iov
, .msg_iovlen
= 1 };
1265 msghdr_prep_recv_fds(&msg
, msgbuf
, msgbufsize
, INT8_MAX
);
1267 #ifdef MSG_CMSG_CLOEXEC
1268 msg
.msg_flags
|= MSG_CMSG_CLOEXEC
;
1271 received
= recvmsg(ctx
->sock
, &msg
, 0);
1272 if (received
== -1) {
1273 if ((errno
== EAGAIN
) ||
1274 (errno
== EWOULDBLOCK
) ||
1276 (errno
== ENOMEM
)) {
1277 /* Not really an error - just try again. */
1280 /* Problem with the socket. Set it unreadable. */
1281 tevent_fd_set_flags(fde
, 0);
1285 if ((size_t)received
> sizeof(buf
)) {
1286 /* More than we expected, not for us */
1290 num_fds
= msghdr_extract_fds(&msg
, NULL
, 0);
1294 messaging_dgm_recv(ctx
, ev
, buf
, received
, fds
, 0);
1299 msghdr_extract_fds(&msg
, fds
, num_fds
);
1301 for (i
= 0; i
< num_fds
; i
++) {
1304 err
= prepare_socket_cloexec(fds
[i
]);
1306 close_fd_array(fds
, num_fds
);
1311 messaging_dgm_recv(ctx
, ev
, buf
, received
, fds
, num_fds
);
1315 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg
*m
)
1317 DLIST_REMOVE(m
->ctx
->in_msgs
, m
);
1322 * Deal with identification of fragmented messages and
1323 * re-assembly into full messages sent, then calls the
1327 static void messaging_dgm_recv(struct messaging_dgm_context
*ctx
,
1328 struct tevent_context
*ev
,
1329 uint8_t *buf
, size_t buflen
,
1330 int *fds
, size_t num_fds
)
1332 struct messaging_dgm_fragment_hdr hdr
;
1333 struct messaging_dgm_in_msg
*msg
;
1337 if (buflen
< sizeof(cookie
)) {
1340 memcpy(&cookie
, buf
, sizeof(cookie
));
1341 buf
+= sizeof(cookie
);
1342 buflen
-= sizeof(cookie
);
1345 ctx
->recv_cb(ev
, buf
, buflen
, fds
, num_fds
,
1346 ctx
->recv_cb_private_data
);
1350 if (buflen
< sizeof(hdr
)) {
1353 memcpy(&hdr
, buf
, sizeof(hdr
));
1355 buflen
-= sizeof(hdr
);
1357 for (msg
= ctx
->in_msgs
; msg
!= NULL
; msg
= msg
->next
) {
1358 if ((msg
->sender_pid
== hdr
.pid
) &&
1359 (msg
->sender_sock
== hdr
.sock
)) {
1364 if ((msg
!= NULL
) && (msg
->cookie
!= cookie
)) {
1370 msglen
= offsetof(struct messaging_dgm_in_msg
, buf
) +
1373 msg
= talloc_size(ctx
, msglen
);
1377 talloc_set_name_const(msg
, "struct messaging_dgm_in_msg");
1379 *msg
= (struct messaging_dgm_in_msg
) {
1380 .ctx
= ctx
, .msglen
= hdr
.msglen
,
1381 .sender_pid
= hdr
.pid
, .sender_sock
= hdr
.sock
,
1384 DLIST_ADD(ctx
->in_msgs
, msg
);
1385 talloc_set_destructor(msg
, messaging_dgm_in_msg_destructor
);
1388 space
= msg
->msglen
- msg
->received
;
1389 if (buflen
> space
) {
1393 memcpy(msg
->buf
+ msg
->received
, buf
, buflen
);
1394 msg
->received
+= buflen
;
1396 if (msg
->received
< msg
->msglen
) {
1398 * Any valid sender will send the fds in the last
1399 * block. Invalid senders might have sent fd's that we
1400 * need to close here.
1405 DLIST_REMOVE(ctx
->in_msgs
, msg
);
1406 talloc_set_destructor(msg
, NULL
);
1408 ctx
->recv_cb(ev
, msg
->buf
, msg
->msglen
, fds
, num_fds
,
1409 ctx
->recv_cb_private_data
);
1415 close_fd_array(fds
, num_fds
);
1418 void messaging_dgm_destroy(void)
1420 TALLOC_FREE(global_dgm_context
);
1423 int messaging_dgm_send(pid_t pid
,
1424 const struct iovec
*iov
, int iovlen
,
1425 const int *fds
, size_t num_fds
)
1427 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1428 struct messaging_dgm_out
*out
;
1430 unsigned retries
= 0;
1436 messaging_dgm_validate(ctx
);
1439 ret
= messaging_dgm_out_get(ctx
, pid
, &out
);
1444 DEBUG(10, ("%s: Sending message to %u\n", __func__
, (unsigned)pid
));
1446 ret
= messaging_dgm_out_send_fragmented(ctx
->ev
, out
, iov
, iovlen
,
1448 if (ret
== ECONNREFUSED
) {
1450 * We cache outgoing sockets. If the receiver has
1451 * closed and re-opened the socket since our last
1452 * message, we get connection refused. Retry.
1465 static int messaging_dgm_read_unique(int fd
, uint64_t *punique
)
1470 unsigned long long unique
;
1473 rw_ret
= pread(fd
, buf
, sizeof(buf
)-1, 0);
1479 unique
= smb_strtoull(buf
, &endptr
, 10, &error
, SMB_STR_STANDARD
);
1484 if (endptr
[0] != '\n') {
1491 int messaging_dgm_get_unique(pid_t pid
, uint64_t *unique
)
1493 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1494 struct sun_path_buf lockfile_name
;
1501 messaging_dgm_validate(ctx
);
1503 if (pid
== getpid()) {
1505 * Protect against losing our own lock
1507 return messaging_dgm_read_unique(ctx
->lockfile_fd
, unique
);
1510 ret
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
),
1511 "%s/%u", ctx
->lockfile_dir
.buf
, (int)pid
);
1515 if ((size_t)ret
>= sizeof(lockfile_name
.buf
)) {
1516 return ENAMETOOLONG
;
1519 fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_RDONLY
, 0);
1524 ret
= messaging_dgm_read_unique(fd
, unique
);
1529 int messaging_dgm_cleanup(pid_t pid
)
1531 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1532 struct sun_path_buf lockfile_name
, socket_name
;
1534 struct flock lck
= {
1542 len
= snprintf(socket_name
.buf
, sizeof(socket_name
.buf
), "%s/%u",
1543 ctx
->socket_dir
.buf
, (unsigned)pid
);
1547 if ((size_t)len
>= sizeof(socket_name
.buf
)) {
1548 return ENAMETOOLONG
;
1551 len
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
), "%s/%u",
1552 ctx
->lockfile_dir
.buf
, (unsigned)pid
);
1556 if ((size_t)len
>= sizeof(lockfile_name
.buf
)) {
1557 return ENAMETOOLONG
;
1560 fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_WRONLY
, 0);
1563 if (ret
!= ENOENT
) {
1564 DEBUG(10, ("%s: open(%s) failed: %s\n", __func__
,
1565 lockfile_name
.buf
, strerror(ret
)));
1570 lck
.l_type
= F_WRLCK
;
1571 lck
.l_whence
= SEEK_SET
;
1575 ret
= fcntl(fd
, F_SETLK
, &lck
);
1578 if ((ret
!= EACCES
) && (ret
!= EAGAIN
)) {
1579 DEBUG(10, ("%s: Could not get lock: %s\n", __func__
,
1586 DEBUG(10, ("%s: Cleaning up : %s\n", __func__
, strerror(ret
)));
1588 (void)unlink(socket_name
.buf
);
1589 (void)unlink(lockfile_name
.buf
);
1594 static int messaging_dgm_wipe_fn(pid_t pid
, void *private_data
)
1596 pid_t
*our_pid
= (pid_t
*)private_data
;
1599 if (pid
== *our_pid
) {
1601 * fcntl(F_GETLK) will succeed for ourselves, we hold
1602 * that lock ourselves.
1607 ret
= messaging_dgm_cleanup(pid
);
1608 DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1609 (unsigned long)pid
, ret
? strerror(ret
) : "ok"));
1614 int messaging_dgm_wipe(void)
1616 pid_t pid
= getpid();
1617 messaging_dgm_forall(messaging_dgm_wipe_fn
, &pid
);
1621 int messaging_dgm_forall(int (*fn
)(pid_t pid
, void *private_data
),
1624 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1633 messaging_dgm_validate(ctx
);
1636 * We scan the socket directory and not the lock directory. Otherwise
1637 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1641 msgdir
= opendir(ctx
->socket_dir
.buf
);
1642 if (msgdir
== NULL
) {
1646 while ((dp
= readdir(msgdir
)) != NULL
) {
1650 pid
= smb_strtoul(dp
->d_name
, NULL
, 10, &error
, SMB_STR_STANDARD
);
1651 if ((pid
== 0) || (error
!= 0)) {
1653 * . and .. and other malformed entries
1658 ret
= fn(pid
, private_data
);
1668 struct messaging_dgm_fde
{
1669 struct tevent_fd
*fde
;
1672 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev
*fde_ev
)
1674 if (fde_ev
->ctx
!= NULL
) {
1675 DLIST_REMOVE(fde_ev
->ctx
->fde_evs
, fde_ev
);
1682 * Reference counter for a struct tevent_fd messaging read event
1683 * (with callback function) on a struct tevent_context registered
1684 * on a messaging context.
1686 * If we've already registered this struct tevent_context before
1687 * (so already have a read event), just increase the reference count.
1689 * Otherwise create a new struct tevent_fd messaging read event on the
1690 * previously unseen struct tevent_context - this is what drives
1691 * the message receive processing.
1695 struct messaging_dgm_fde
*messaging_dgm_register_tevent_context(
1696 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
)
1698 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1699 struct messaging_dgm_fde_ev
*fde_ev
;
1700 struct messaging_dgm_fde
*fde
;
1706 fde
= talloc(mem_ctx
, struct messaging_dgm_fde
);
1711 for (fde_ev
= ctx
->fde_evs
; fde_ev
!= NULL
; fde_ev
= fde_ev
->next
) {
1712 if (tevent_fd_get_flags(fde_ev
->fde
) == 0) {
1714 * If the event context got deleted,
1715 * tevent_fd_get_flags() will return 0
1716 * for the stale fde.
1718 * In that case we should not
1719 * use fde_ev->ev anymore.
1723 if (fde_ev
->ev
== ev
) {
1728 if (fde_ev
== NULL
) {
1729 fde_ev
= talloc(fde
, struct messaging_dgm_fde_ev
);
1730 if (fde_ev
== NULL
) {
1733 fde_ev
->fde
= tevent_add_fd(
1734 ev
, fde_ev
, ctx
->sock
, TEVENT_FD_READ
,
1735 messaging_dgm_read_handler
, ctx
);
1736 if (fde_ev
->fde
== NULL
) {
1742 DLIST_ADD(ctx
->fde_evs
, fde_ev
);
1743 talloc_set_destructor(
1744 fde_ev
, messaging_dgm_fde_ev_destructor
);
1747 * Same trick as with tdb_wrap: The caller will never
1748 * see the talloc_referenced object, the
1749 * messaging_dgm_fde_ev, so problems with
1750 * talloc_unlink will not happen.
1752 if (talloc_reference(fde
, fde_ev
) == NULL
) {
1758 fde
->fde
= fde_ev
->fde
;
1762 bool messaging_dgm_fde_active(struct messaging_dgm_fde
*fde
)
1769 flags
= tevent_fd_get_flags(fde
->fde
);
1770 return (flags
!= 0);