2 * Unix SMB/CIFS implementation.
3 * Samba internal messaging functions
4 * Copyright (C) 2013 by Volker Lendecke
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "util/util.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/select.h"
26 #include "lib/util/debug.h"
27 #include "messages_dgm.h"
28 #include "lib/util/genrand.h"
29 #include "lib/util/dlinklist.h"
30 #include "lib/pthreadpool/pthreadpool_tevent.h"
31 #include "lib/util/msghdr.h"
32 #include "lib/util/iov_buf.h"
33 #include "lib/util/blocking.h"
34 #include "lib/util/tevent_unix.h"
35 #include "lib/util/smb_strtox.h"
37 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
41 * This will carry enough for a socket path
43 char buf
[sizeof(struct sockaddr_un
)];
47 * We can only have one tevent_fd per dgm_context and per
48 * tevent_context. Maintain a list of registered tevent_contexts per
51 struct messaging_dgm_fde_ev
{
52 struct messaging_dgm_fde_ev
*prev
, *next
;
55 * Backreference to enable DLIST_REMOVE from our
56 * destructor. Also, set to NULL when the dgm_context dies
57 * before the messaging_dgm_fde_ev.
59 struct messaging_dgm_context
*ctx
;
61 struct tevent_context
*ev
;
62 struct tevent_fd
*fde
;
65 struct messaging_dgm_out
{
66 struct messaging_dgm_out
*prev
, *next
;
67 struct messaging_dgm_context
*ctx
;
74 struct tevent_queue
*queue
;
75 struct tevent_timer
*idle_timer
;
78 struct messaging_dgm_in_msg
{
79 struct messaging_dgm_in_msg
*prev
, *next
;
80 struct messaging_dgm_context
*ctx
;
89 struct messaging_dgm_context
{
90 struct tevent_context
*ev
;
92 struct sun_path_buf socket_dir
;
93 struct sun_path_buf lockfile_dir
;
97 struct messaging_dgm_in_msg
*in_msgs
;
99 struct messaging_dgm_fde_ev
*fde_evs
;
100 void (*recv_cb
)(struct tevent_context
*ev
,
106 void *recv_cb_private_data
;
108 bool *have_dgm_context
;
110 struct pthreadpool_tevent
*pool
;
111 struct messaging_dgm_out
*outsocks
;
114 /* Set socket close on exec. */
115 static int prepare_socket_cloexec(int sock
)
120 flags
= fcntl(sock
, F_GETFD
, 0);
125 if (fcntl(sock
, F_SETFD
, flags
) == -1) {
132 static void close_fd_array(int *fds
, size_t num_fds
)
136 for (i
= 0; i
< num_fds
; i
++) {
147 * The idle handler can free the struct messaging_dgm_out *,
148 * if it's unused (qlen of zero) which closes the socket.
151 static void messaging_dgm_out_idle_handler(struct tevent_context
*ev
,
152 struct tevent_timer
*te
,
153 struct timeval current_time
,
156 struct messaging_dgm_out
*out
= talloc_get_type_abort(
157 private_data
, struct messaging_dgm_out
);
160 out
->idle_timer
= NULL
;
162 qlen
= tevent_queue_length(out
->queue
);
169 * Setup the idle handler to fire after 1 second if the
173 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out
*out
)
177 qlen
= tevent_queue_length(out
->queue
);
179 TALLOC_FREE(out
->idle_timer
);
183 if (out
->idle_timer
!= NULL
) {
184 tevent_update_timer(out
->idle_timer
,
185 tevent_timeval_current_ofs(1, 0));
189 out
->idle_timer
= tevent_add_timer(
190 out
->ctx
->ev
, out
, tevent_timeval_current_ofs(1, 0),
191 messaging_dgm_out_idle_handler
, out
);
193 * No NULL check, we'll come back here. Worst case we're
198 static int messaging_dgm_out_destructor(struct messaging_dgm_out
*dst
);
199 static void messaging_dgm_out_idle_handler(struct tevent_context
*ev
,
200 struct tevent_timer
*te
,
201 struct timeval current_time
,
205 * Connect to an existing rendezvous point for another
206 * pid - wrapped inside a struct messaging_dgm_out *.
209 static int messaging_dgm_out_create(TALLOC_CTX
*mem_ctx
,
210 struct messaging_dgm_context
*ctx
,
211 pid_t pid
, struct messaging_dgm_out
**pout
)
213 struct messaging_dgm_out
*out
;
214 struct sockaddr_un addr
= { .sun_family
= AF_UNIX
};
217 char addr_buf
[sizeof(addr
.sun_path
) + (3 * sizeof(unsigned) + 2)];
219 out
= talloc(mem_ctx
, struct messaging_dgm_out
);
224 *out
= (struct messaging_dgm_out
) {
230 out_pathlen
= snprintf(addr_buf
, sizeof(addr_buf
),
231 "%s/%u", ctx
->socket_dir
.buf
, (unsigned)pid
);
232 if (out_pathlen
< 0) {
235 if ((size_t)out_pathlen
>= sizeof(addr
.sun_path
)) {
240 memcpy(addr
.sun_path
, addr_buf
, out_pathlen
+ 1);
242 out
->queue
= tevent_queue_create(out
, addr
.sun_path
);
243 if (out
->queue
== NULL
) {
248 out
->sock
= socket(AF_UNIX
, SOCK_DGRAM
, 0);
249 if (out
->sock
== -1) {
253 DLIST_ADD(ctx
->outsocks
, out
);
254 talloc_set_destructor(out
, messaging_dgm_out_destructor
);
257 ret
= connect(out
->sock
,
258 (const struct sockaddr
*)(const void *)&addr
,
260 } while ((ret
== -1) && (errno
== EINTR
));
266 ret
= set_blocking(out
->sock
, false);
270 out
->is_blocking
= false;
281 static int messaging_dgm_out_destructor(struct messaging_dgm_out
*out
)
283 DLIST_REMOVE(out
->ctx
->outsocks
, out
);
285 if ((tevent_queue_length(out
->queue
) != 0) &&
286 (tevent_cached_getpid() == out
->ctx
->pid
)) {
288 * We have pending jobs. We can't close the socket,
289 * this has been handed over to messaging_dgm_out_queue_state.
294 if (out
->sock
!= -1) {
302 * Find the struct messaging_dgm_out * to talk to pid.
303 * If we don't have one, create it. Set the timer to
304 * delete after 1 sec.
307 static int messaging_dgm_out_get(struct messaging_dgm_context
*ctx
, pid_t pid
,
308 struct messaging_dgm_out
**pout
)
310 struct messaging_dgm_out
*out
;
313 for (out
= ctx
->outsocks
; out
!= NULL
; out
= out
->next
) {
314 if (out
->pid
== pid
) {
320 ret
= messaging_dgm_out_create(ctx
, ctx
, pid
, &out
);
327 * shouldn't be possible, should be set if messaging_dgm_out_create
328 * succeeded. This check is to satisfy static checker
333 messaging_dgm_out_rearm_idle_timer(out
);
340 * This function is called directly to send a message fragment
341 * when the outgoing queue is zero, and from a pthreadpool
342 * job thread when messages are being queued (qlen != 0).
343 * Make sure *ONLY* thread-safe functions are called within.
346 static ssize_t
messaging_dgm_sendmsg(int sock
,
347 const struct iovec
*iov
, int iovlen
,
348 const int *fds
, size_t num_fds
,
355 * Do the actual sendmsg syscall. This will be called from a
356 * pthreadpool helper thread, so be careful what you do here.
359 msg
= (struct msghdr
) {
360 .msg_iov
= discard_const_p(struct iovec
, iov
),
364 fdlen
= msghdr_prep_fds(&msg
, NULL
, 0, fds
, num_fds
);
373 msghdr_prep_fds(&msg
, buf
, fdlen
, fds
, num_fds
);
376 ret
= sendmsg(sock
, &msg
, 0);
377 } while ((ret
== -1) && (errno
== EINTR
));
386 struct messaging_dgm_out_queue_state
{
387 struct tevent_context
*ev
;
388 struct pthreadpool_tevent
*pool
;
390 struct tevent_req
*req
;
391 struct tevent_req
*subreq
;
402 static int messaging_dgm_out_queue_state_destructor(
403 struct messaging_dgm_out_queue_state
*state
);
404 static void messaging_dgm_out_queue_trigger(struct tevent_req
*req
,
406 static void messaging_dgm_out_threaded_job(void *private_data
);
407 static void messaging_dgm_out_queue_done(struct tevent_req
*subreq
);
410 * Push a message fragment onto a queue to be sent by a
411 * threadpool job. Makes copies of data/fd's to be sent.
412 * The running tevent_queue internally creates an immediate
413 * event to schedule the write.
416 static struct tevent_req
*messaging_dgm_out_queue_send(
417 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
418 struct messaging_dgm_out
*out
,
419 const struct iovec
*iov
, int iovlen
, const int *fds
, size_t num_fds
)
421 struct tevent_req
*req
;
422 struct messaging_dgm_out_queue_state
*state
;
423 struct tevent_queue_entry
*e
;
427 req
= tevent_req_create(out
, &state
,
428 struct messaging_dgm_out_queue_state
);
433 state
->pool
= out
->ctx
->pool
;
434 state
->sock
= out
->sock
;
438 * Go blocking in a thread
440 if (!out
->is_blocking
) {
441 int ret
= set_blocking(out
->sock
, true);
443 tevent_req_error(req
, errno
);
444 return tevent_req_post(req
, ev
);
446 out
->is_blocking
= true;
449 buflen
= iov_buflen(iov
, iovlen
);
451 tevent_req_error(req
, EMSGSIZE
);
452 return tevent_req_post(req
, ev
);
455 state
->buf
= talloc_array(state
, uint8_t, buflen
);
456 if (tevent_req_nomem(state
->buf
, req
)) {
457 return tevent_req_post(req
, ev
);
459 iov_buf(iov
, iovlen
, state
->buf
, buflen
);
461 state
->fds
= talloc_array(state
, int, num_fds
);
462 if (tevent_req_nomem(state
->fds
, req
)) {
463 return tevent_req_post(req
, ev
);
466 for (i
=0; i
<num_fds
; i
++) {
470 for (i
=0; i
<num_fds
; i
++) {
472 state
->fds
[i
] = dup(fds
[i
]);
474 if (state
->fds
[i
] == -1) {
477 close_fd_array(state
->fds
, num_fds
);
479 tevent_req_error(req
, ret
);
480 return tevent_req_post(req
, ev
);
484 talloc_set_destructor(state
, messaging_dgm_out_queue_state_destructor
);
486 e
= tevent_queue_add_entry(out
->queue
, ev
, req
,
487 messaging_dgm_out_queue_trigger
, req
);
488 if (tevent_req_nomem(e
, req
)) {
489 return tevent_req_post(req
, ev
);
494 static int messaging_dgm_out_queue_state_destructor(
495 struct messaging_dgm_out_queue_state
*state
)
500 if (state
->subreq
!= NULL
) {
502 * We're scheduled, but we're destroyed. This happens
503 * if the messaging_dgm_context is destroyed while
504 * we're stuck in a blocking send. There's nothing we
505 * can do but to leak memory.
507 TALLOC_FREE(state
->subreq
);
508 (void)talloc_reparent(state
->req
, NULL
, state
);
513 num_fds
= talloc_array_length(fds
);
514 close_fd_array(fds
, num_fds
);
519 * tevent_queue callback that schedules the pthreadpool to actually
520 * send the queued message fragment.
523 static void messaging_dgm_out_queue_trigger(struct tevent_req
*req
,
526 struct messaging_dgm_out_queue_state
*state
= tevent_req_data(
527 req
, struct messaging_dgm_out_queue_state
);
529 tevent_req_reset_endtime(req
);
531 state
->subreq
= pthreadpool_tevent_job_send(
532 state
, state
->ev
, state
->pool
,
533 messaging_dgm_out_threaded_job
, state
);
534 if (tevent_req_nomem(state
->subreq
, req
)) {
537 tevent_req_set_callback(state
->subreq
, messaging_dgm_out_queue_done
,
542 * Wrapper function run by the pthread that calls
543 * messaging_dgm_sendmsg() to actually do the sendmsg().
546 static void messaging_dgm_out_threaded_job(void *private_data
)
548 struct messaging_dgm_out_queue_state
*state
= talloc_get_type_abort(
549 private_data
, struct messaging_dgm_out_queue_state
);
551 struct iovec iov
= { .iov_base
= state
->buf
,
552 .iov_len
= talloc_get_size(state
->buf
) };
553 size_t num_fds
= talloc_array_length(state
->fds
);
559 state
->sent
= messaging_dgm_sendmsg(state
->sock
, &iov
, 1,
560 state
->fds
, num_fds
, &state
->err
);
562 if (state
->sent
!= -1) {
565 if (state
->err
!= ENOBUFS
) {
570 * ENOBUFS is the FreeBSD way of saying "Try
571 * again". We have to do polling.
574 ret
= poll(NULL
, 0, msec
);
575 } while ((ret
== -1) && (errno
== EINTR
));
578 * Exponential backoff up to once a second
581 msec
= MIN(msec
, 1000);
586 * Pickup the results of the pthread sendmsg().
589 static void messaging_dgm_out_queue_done(struct tevent_req
*subreq
)
591 struct tevent_req
*req
= tevent_req_callback_data(
592 subreq
, struct tevent_req
);
593 struct messaging_dgm_out_queue_state
*state
= tevent_req_data(
594 req
, struct messaging_dgm_out_queue_state
);
597 if (subreq
!= state
->subreq
) {
601 ret
= pthreadpool_tevent_job_recv(subreq
);
604 state
->subreq
= NULL
;
606 if (tevent_req_error(req
, ret
)) {
609 if (state
->sent
== -1) {
610 tevent_req_error(req
, state
->err
);
613 tevent_req_done(req
);
616 static int messaging_dgm_out_queue_recv(struct tevent_req
*req
)
618 return tevent_req_simple_recv_unix(req
);
621 static void messaging_dgm_out_sent_fragment(struct tevent_req
*req
);
624 * Core function to send a message fragment given a
625 * connected struct messaging_dgm_out * destination.
626 * If no current queue tries to send nonblocking
627 * directly. If not, queues the fragment (which makes
628 * a copy of it) and adds a 60-second timeout on the send.
631 static int messaging_dgm_out_send_fragment(
632 struct tevent_context
*ev
, struct messaging_dgm_out
*out
,
633 const struct iovec
*iov
, int iovlen
, const int *fds
, size_t num_fds
)
635 struct tevent_req
*req
;
639 qlen
= tevent_queue_length(out
->queue
);
644 if (out
->is_blocking
) {
645 int ret
= set_blocking(out
->sock
, false);
649 out
->is_blocking
= false;
652 nsent
= messaging_dgm_sendmsg(out
->sock
, iov
, iovlen
, fds
,
658 if (err
== ENOBUFS
) {
660 * FreeBSD's way of telling us the dst socket
661 * is full. EWOULDBLOCK makes us spawn a
662 * polling helper thread.
667 if (err
!= EWOULDBLOCK
) {
672 req
= messaging_dgm_out_queue_send(out
, ev
, out
, iov
, iovlen
,
677 tevent_req_set_callback(req
, messaging_dgm_out_sent_fragment
, out
);
679 ok
= tevent_req_set_endtime(req
, ev
,
680 tevent_timeval_current_ofs(60, 0));
690 * Pickup the result of the fragment send. Reset idle timer
694 static void messaging_dgm_out_sent_fragment(struct tevent_req
*req
)
696 struct messaging_dgm_out
*out
= tevent_req_callback_data(
697 req
, struct messaging_dgm_out
);
700 ret
= messaging_dgm_out_queue_recv(req
);
704 DBG_WARNING("messaging_out_queue_recv returned %s\n",
708 messaging_dgm_out_rearm_idle_timer(out
);
712 struct messaging_dgm_fragment_hdr
{
719 * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
720 * size chunks and send it.
722 * Message fragments are prefixed by a 64-bit cookie that
723 * stays the same for all fragments. This allows the receiver
724 * to recognise fragments of the same message and re-assemble
725 * them on the other end.
727 * Note that this allows other message fragments from other
728 * senders to be interleaved in the receive read processing,
729 * the combination of the cookie and header info allows unique
730 * identification of the message from a specific sender in
733 * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
734 * then send a single message with cookie set to zero.
736 * Otherwise the message is fragmented into chunks and added
737 * to the sending queue. Any file descriptors are passed only
738 * in the last fragment.
740 * Finally the cookie is incremented (wrap over zero) to
741 * prepare for the next message sent to this channel.
745 static int messaging_dgm_out_send_fragmented(struct tevent_context
*ev
,
746 struct messaging_dgm_out
*out
,
747 const struct iovec
*iov
,
749 const int *fds
, size_t num_fds
)
751 ssize_t msglen
, sent
;
753 struct iovec iov_copy
[iovlen
+2];
754 struct messaging_dgm_fragment_hdr hdr
;
755 struct iovec src_iov
;
761 msglen
= iov_buflen(iov
, iovlen
);
765 if (num_fds
> INT8_MAX
) {
769 if ((size_t) msglen
<=
770 (MESSAGING_DGM_FRAGMENT_LENGTH
- sizeof(uint64_t))) {
773 iov_copy
[0].iov_base
= &cookie
;
774 iov_copy
[0].iov_len
= sizeof(cookie
);
776 memcpy(&iov_copy
[1], iov
,
777 sizeof(struct iovec
) * iovlen
);
780 return messaging_dgm_out_send_fragment(
781 ev
, out
, iov_copy
, iovlen
+1, fds
, num_fds
);
785 hdr
= (struct messaging_dgm_fragment_hdr
) {
787 .pid
= tevent_cached_getpid(),
791 iov_copy
[0].iov_base
= &out
->cookie
;
792 iov_copy
[0].iov_len
= sizeof(out
->cookie
);
793 iov_copy
[1].iov_base
= &hdr
;
794 iov_copy
[1].iov_len
= sizeof(hdr
);
800 * The following write loop sends the user message in pieces. We have
801 * filled the first two iovecs above with "cookie" and "hdr". In the
802 * following loops we pull message chunks from the user iov array and
803 * fill iov_copy piece by piece, possibly truncating chunks from the
804 * caller's iov array. Ugly, but hopefully efficient.
807 while (sent
< msglen
) {
809 size_t iov_index
= 2;
811 fragment_len
= sizeof(out
->cookie
) + sizeof(hdr
);
813 while (fragment_len
< MESSAGING_DGM_FRAGMENT_LENGTH
) {
816 space
= MESSAGING_DGM_FRAGMENT_LENGTH
- fragment_len
;
817 chunk
= MIN(space
, src_iov
.iov_len
);
819 iov_copy
[iov_index
].iov_base
= src_iov
.iov_base
;
820 iov_copy
[iov_index
].iov_len
= chunk
;
823 src_iov
.iov_base
= (char *)src_iov
.iov_base
+ chunk
;
824 src_iov
.iov_len
-= chunk
;
825 fragment_len
+= chunk
;
827 if (src_iov
.iov_len
== 0) {
836 sent
+= (fragment_len
- sizeof(out
->cookie
) - sizeof(hdr
));
839 * only the last fragment should pass the fd array.
840 * That simplifies the receiver a lot.
843 ret
= messaging_dgm_out_send_fragment(
844 ev
, out
, iov_copy
, iov_index
, NULL
, 0);
846 ret
= messaging_dgm_out_send_fragment(
847 ev
, out
, iov_copy
, iov_index
, fds
, num_fds
);
855 if (out
->cookie
== 0) {
862 static struct messaging_dgm_context
*global_dgm_context
;
864 static int messaging_dgm_context_destructor(struct messaging_dgm_context
*c
);
866 static int messaging_dgm_lockfile_create(struct messaging_dgm_context
*ctx
,
867 pid_t pid
, int *plockfile_fd
,
872 struct sun_path_buf lockfile_name
;
878 ret
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
),
879 "%s/%u", ctx
->lockfile_dir
.buf
, (unsigned)pid
);
883 if ((unsigned)ret
>= sizeof(lockfile_name
.buf
)) {
887 /* no O_EXCL, existence check is via the fcntl lock */
889 lockfile_fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_CREAT
|O_RDWR
,
892 if ((lockfile_fd
== -1) &&
893 ((errno
== ENXIO
) /* Linux */ ||
894 (errno
== ENODEV
) /* Linux kernel bug */ ||
895 (errno
== EOPNOTSUPP
) /* FreeBSD */)) {
897 * Huh -- a socket? This might be a stale socket from
898 * an upgrade of Samba. Just unlink and retry, nobody
899 * else is supposed to be here at this time.
901 * Yes, this is racy, but I don't see a way to deal
902 * with this properly.
904 unlink(lockfile_name
.buf
);
906 lockfile_fd
= open(lockfile_name
.buf
,
907 O_NONBLOCK
|O_CREAT
|O_WRONLY
,
911 if (lockfile_fd
== -1) {
913 DEBUG(1, ("%s: open failed: %s\n", __func__
, strerror(errno
)));
917 lck
= (struct flock
) {
922 ret
= fcntl(lockfile_fd
, F_SETLK
, &lck
);
925 DEBUG(1, ("%s: fcntl failed: %s\n", __func__
, strerror(ret
)));
930 * Directly using the binary value for
931 * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
932 * violation. But including all of ndr here just for this
933 * seems to be a bit overkill to me. Also, messages_dgm might
934 * be replaced sooner or later by something streams-based,
935 * where unique_id generation will be handled differently.
939 generate_random_buffer((uint8_t *)&unique
, sizeof(unique
));
940 } while (unique
== UINT64_C(0xFFFFFFFFFFFFFFFF));
942 unique_len
= snprintf(buf
, sizeof(buf
), "%"PRIu64
"\n", unique
);
944 /* shorten a potentially preexisting file */
946 ret
= ftruncate(lockfile_fd
, unique_len
);
949 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__
,
954 written
= write(lockfile_fd
, buf
, unique_len
);
955 if (written
!= unique_len
) {
957 DEBUG(1, ("%s: write failed: %s\n", __func__
, strerror(ret
)));
961 *plockfile_fd
= lockfile_fd
;
966 unlink(lockfile_name
.buf
);
972 static void messaging_dgm_read_handler(struct tevent_context
*ev
,
973 struct tevent_fd
*fde
,
978 * Create the rendezvous point in the file system
979 * that other processes can use to send messages to
983 int messaging_dgm_init(struct tevent_context
*ev
,
985 const char *socket_dir
,
986 const char *lockfile_dir
,
987 void (*recv_cb
)(struct tevent_context
*ev
,
993 void *recv_cb_private_data
)
995 struct messaging_dgm_context
*ctx
;
997 struct sockaddr_un socket_address
;
999 static bool have_dgm_context
= false;
1001 if (have_dgm_context
) {
1005 if ((socket_dir
== NULL
) || (lockfile_dir
== NULL
)) {
1009 ctx
= talloc_zero(NULL
, struct messaging_dgm_context
);
1014 ctx
->pid
= tevent_cached_getpid();
1015 ctx
->recv_cb
= recv_cb
;
1016 ctx
->recv_cb_private_data
= recv_cb_private_data
;
1018 len
= strlcpy(ctx
->lockfile_dir
.buf
, lockfile_dir
,
1019 sizeof(ctx
->lockfile_dir
.buf
));
1020 if (len
>= sizeof(ctx
->lockfile_dir
.buf
)) {
1022 return ENAMETOOLONG
;
1025 len
= strlcpy(ctx
->socket_dir
.buf
, socket_dir
,
1026 sizeof(ctx
->socket_dir
.buf
));
1027 if (len
>= sizeof(ctx
->socket_dir
.buf
)) {
1029 return ENAMETOOLONG
;
1032 socket_address
= (struct sockaddr_un
) { .sun_family
= AF_UNIX
};
1033 len
= snprintf(socket_address
.sun_path
,
1034 sizeof(socket_address
.sun_path
),
1035 "%s/%u", socket_dir
, (unsigned)ctx
->pid
);
1036 if (len
>= sizeof(socket_address
.sun_path
)) {
1038 return ENAMETOOLONG
;
1041 ret
= messaging_dgm_lockfile_create(ctx
, ctx
->pid
, &ctx
->lockfile_fd
,
1044 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1045 __func__
, strerror(ret
)));
1050 unlink(socket_address
.sun_path
);
1052 ctx
->sock
= socket(AF_UNIX
, SOCK_DGRAM
, 0);
1053 if (ctx
->sock
== -1) {
1055 DBG_WARNING("socket failed: %s\n", strerror(ret
));
1060 ret
= prepare_socket_cloexec(ctx
->sock
);
1063 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1069 ret
= bind(ctx
->sock
, (struct sockaddr
*)(void *)&socket_address
,
1070 sizeof(socket_address
));
1073 DBG_WARNING("bind failed: %s\n", strerror(ret
));
1078 talloc_set_destructor(ctx
, messaging_dgm_context_destructor
);
1080 ctx
->have_dgm_context
= &have_dgm_context
;
1082 ret
= pthreadpool_tevent_init(ctx
, UINT_MAX
, &ctx
->pool
);
1084 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1090 global_dgm_context
= ctx
;
1099 * Remove the rendezvous point in the filesystem
1100 * if we're the owner.
1103 static int messaging_dgm_context_destructor(struct messaging_dgm_context
*c
)
1105 while (c
->outsocks
!= NULL
) {
1106 TALLOC_FREE(c
->outsocks
);
1108 while (c
->in_msgs
!= NULL
) {
1109 TALLOC_FREE(c
->in_msgs
);
1111 while (c
->fde_evs
!= NULL
) {
1112 tevent_fd_set_flags(c
->fde_evs
->fde
, 0);
1113 c
->fde_evs
->ctx
= NULL
;
1114 DLIST_REMOVE(c
->fde_evs
, c
->fde_evs
);
1119 if (tevent_cached_getpid() == c
->pid
) {
1120 struct sun_path_buf name
;
1123 ret
= snprintf(name
.buf
, sizeof(name
.buf
), "%s/%u",
1124 c
->socket_dir
.buf
, (unsigned)c
->pid
);
1125 if ((ret
< 0) || ((size_t)ret
>= sizeof(name
.buf
))) {
1127 * We've checked the length when creating, so this
1128 * should never happen
1134 ret
= snprintf(name
.buf
, sizeof(name
.buf
), "%s/%u",
1135 c
->lockfile_dir
.buf
, (unsigned)c
->pid
);
1136 if ((ret
< 0) || ((size_t)ret
>= sizeof(name
.buf
))) {
1138 * We've checked the length when creating, so this
1139 * should never happen
1145 close(c
->lockfile_fd
);
1147 if (c
->have_dgm_context
!= NULL
) {
1148 *c
->have_dgm_context
= false;
1154 static void messaging_dgm_validate(struct messaging_dgm_context
*ctx
)
1157 pid_t pid
= tevent_cached_getpid();
1158 struct sockaddr_storage addr
;
1159 socklen_t addrlen
= sizeof(addr
);
1160 struct sockaddr_un
*un_addr
;
1161 struct sun_path_buf pathbuf
;
1162 struct stat st1
, st2
;
1166 * Protect against using the wrong messaging context after a
1167 * fork without reinit_after_fork.
1170 ret
= getsockname(ctx
->sock
, (struct sockaddr
*)&addr
, &addrlen
);
1172 DBG_ERR("getsockname failed: %s\n", strerror(errno
));
1175 if (addr
.ss_family
!= AF_UNIX
) {
1176 DBG_ERR("getsockname returned family %d\n",
1177 (int)addr
.ss_family
);
1180 un_addr
= (struct sockaddr_un
*)&addr
;
1182 ret
= snprintf(pathbuf
.buf
, sizeof(pathbuf
.buf
),
1183 "%s/%u", ctx
->socket_dir
.buf
, (unsigned)pid
);
1185 DBG_ERR("snprintf failed: %s\n", strerror(errno
));
1188 if ((size_t)ret
>= sizeof(pathbuf
.buf
)) {
1189 DBG_ERR("snprintf returned %d chars\n", (int)ret
);
1193 if (strcmp(pathbuf
.buf
, un_addr
->sun_path
) != 0) {
1194 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1195 pathbuf
.buf
, un_addr
->sun_path
);
1199 ret
= snprintf(pathbuf
.buf
, sizeof(pathbuf
.buf
),
1200 "%s/%u", ctx
->lockfile_dir
.buf
, (unsigned)pid
);
1202 DBG_ERR("snprintf failed: %s\n", strerror(errno
));
1205 if ((size_t)ret
>= sizeof(pathbuf
.buf
)) {
1206 DBG_ERR("snprintf returned %d chars\n", (int)ret
);
1210 ret
= stat(pathbuf
.buf
, &st1
);
1212 DBG_ERR("stat failed: %s\n", strerror(errno
));
1215 ret
= fstat(ctx
->lockfile_fd
, &st2
);
1217 DBG_ERR("fstat failed: %s\n", strerror(errno
));
1221 if ((st1
.st_dev
!= st2
.st_dev
) || (st1
.st_ino
!= st2
.st_ino
)) {
1222 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1223 (int)st2
.st_dev
, (int)st2
.st_ino
,
1224 (int)st1
.st_dev
, (int)st1
.st_ino
);
1236 static void messaging_dgm_recv(struct messaging_dgm_context
*ctx
,
1237 struct tevent_context
*ev
,
1238 uint8_t *msg
, size_t msg_len
,
1239 int *fds
, size_t num_fds
);
1242 * Raw read callback handler - passes to messaging_dgm_recv()
1243 * for fragment reassembly processing.
1246 static void messaging_dgm_read_handler(struct tevent_context
*ev
,
1247 struct tevent_fd
*fde
,
1251 struct messaging_dgm_context
*ctx
= talloc_get_type_abort(
1252 private_data
, struct messaging_dgm_context
);
1256 size_t msgbufsize
= msghdr_prep_recv_fds(NULL
, NULL
, 0, INT8_MAX
);
1257 uint8_t msgbuf
[msgbufsize
];
1258 uint8_t buf
[MESSAGING_DGM_FRAGMENT_LENGTH
];
1261 messaging_dgm_validate(ctx
);
1263 if ((flags
& TEVENT_FD_READ
) == 0) {
1267 iov
= (struct iovec
) { .iov_base
= buf
, .iov_len
= sizeof(buf
) };
1268 msg
= (struct msghdr
) { .msg_iov
= &iov
, .msg_iovlen
= 1 };
1270 msghdr_prep_recv_fds(&msg
, msgbuf
, msgbufsize
, INT8_MAX
);
1272 #ifdef MSG_CMSG_CLOEXEC
1273 msg
.msg_flags
|= MSG_CMSG_CLOEXEC
;
1276 received
= recvmsg(ctx
->sock
, &msg
, 0);
1277 if (received
== -1) {
1278 if ((errno
== EAGAIN
) ||
1279 (errno
== EWOULDBLOCK
) ||
1281 (errno
== ENOMEM
)) {
1282 /* Not really an error - just try again. */
1285 /* Problem with the socket. Set it unreadable. */
1286 tevent_fd_set_flags(fde
, 0);
1290 if ((size_t)received
> sizeof(buf
)) {
1291 /* More than we expected, not for us */
1295 num_fds
= msghdr_extract_fds(&msg
, NULL
, 0);
1299 messaging_dgm_recv(ctx
, ev
, buf
, received
, fds
, 0);
1304 msghdr_extract_fds(&msg
, fds
, num_fds
);
1306 for (i
= 0; i
< num_fds
; i
++) {
1309 err
= prepare_socket_cloexec(fds
[i
]);
1311 close_fd_array(fds
, num_fds
);
1316 messaging_dgm_recv(ctx
, ev
, buf
, received
, fds
, num_fds
);
1320 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg
*m
)
1322 DLIST_REMOVE(m
->ctx
->in_msgs
, m
);
1326 static void messaging_dgm_close_unconsumed(int *fds
, size_t num_fds
)
1330 for (i
=0; i
<num_fds
; i
++) {
1339 * Deal with identification of fragmented messages and
1340 * re-assembly into full messages sent, then calls the
1344 static void messaging_dgm_recv(struct messaging_dgm_context
*ctx
,
1345 struct tevent_context
*ev
,
1346 uint8_t *buf
, size_t buflen
,
1347 int *fds
, size_t num_fds
)
1349 struct messaging_dgm_fragment_hdr hdr
;
1350 struct messaging_dgm_in_msg
*msg
;
1354 if (buflen
< sizeof(cookie
)) {
1357 memcpy(&cookie
, buf
, sizeof(cookie
));
1358 buf
+= sizeof(cookie
);
1359 buflen
-= sizeof(cookie
);
1362 ctx
->recv_cb(ev
, buf
, buflen
, fds
, num_fds
,
1363 ctx
->recv_cb_private_data
);
1364 messaging_dgm_close_unconsumed(fds
, num_fds
);
1368 if (buflen
< sizeof(hdr
)) {
1371 memcpy(&hdr
, buf
, sizeof(hdr
));
1373 buflen
-= sizeof(hdr
);
1375 for (msg
= ctx
->in_msgs
; msg
!= NULL
; msg
= msg
->next
) {
1376 if ((msg
->sender_pid
== hdr
.pid
) &&
1377 (msg
->sender_sock
== hdr
.sock
)) {
1382 if ((msg
!= NULL
) && (msg
->cookie
!= cookie
)) {
1388 msglen
= offsetof(struct messaging_dgm_in_msg
, buf
) +
1391 msg
= talloc_size(ctx
, msglen
);
1395 talloc_set_name_const(msg
, "struct messaging_dgm_in_msg");
1397 *msg
= (struct messaging_dgm_in_msg
) {
1398 .ctx
= ctx
, .msglen
= hdr
.msglen
,
1399 .sender_pid
= hdr
.pid
, .sender_sock
= hdr
.sock
,
1402 DLIST_ADD(ctx
->in_msgs
, msg
);
1403 talloc_set_destructor(msg
, messaging_dgm_in_msg_destructor
);
1406 space
= msg
->msglen
- msg
->received
;
1407 if (buflen
> space
) {
1411 memcpy(msg
->buf
+ msg
->received
, buf
, buflen
);
1412 msg
->received
+= buflen
;
1414 if (msg
->received
< msg
->msglen
) {
1416 * Any valid sender will send the fds in the last
1417 * block. Invalid senders might have sent fd's that we
1418 * need to close here.
1423 DLIST_REMOVE(ctx
->in_msgs
, msg
);
1424 talloc_set_destructor(msg
, NULL
);
1426 ctx
->recv_cb(ev
, msg
->buf
, msg
->msglen
, fds
, num_fds
,
1427 ctx
->recv_cb_private_data
);
1428 messaging_dgm_close_unconsumed(fds
, num_fds
);
1434 close_fd_array(fds
, num_fds
);
1437 void messaging_dgm_destroy(void)
1439 TALLOC_FREE(global_dgm_context
);
1442 int messaging_dgm_send(pid_t pid
,
1443 const struct iovec
*iov
, int iovlen
,
1444 const int *fds
, size_t num_fds
)
1446 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1447 struct messaging_dgm_out
*out
;
1449 unsigned retries
= 0;
1455 messaging_dgm_validate(ctx
);
1458 ret
= messaging_dgm_out_get(ctx
, pid
, &out
);
1463 DEBUG(10, ("%s: Sending message to %u\n", __func__
, (unsigned)pid
));
1465 ret
= messaging_dgm_out_send_fragmented(ctx
->ev
, out
, iov
, iovlen
,
1467 if (ret
== ECONNREFUSED
) {
1469 * We cache outgoing sockets. If the receiver has
1470 * closed and re-opened the socket since our last
1471 * message, we get connection refused. Retry.
1484 static int messaging_dgm_read_unique(int fd
, uint64_t *punique
)
1489 unsigned long long unique
;
1492 rw_ret
= pread(fd
, buf
, sizeof(buf
)-1, 0);
1498 unique
= smb_strtoull(buf
, &endptr
, 10, &error
, SMB_STR_STANDARD
);
1503 if (endptr
[0] != '\n') {
1510 int messaging_dgm_get_unique(pid_t pid
, uint64_t *unique
)
1512 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1513 struct sun_path_buf lockfile_name
;
1520 messaging_dgm_validate(ctx
);
1522 if (pid
== tevent_cached_getpid()) {
1524 * Protect against losing our own lock
1526 return messaging_dgm_read_unique(ctx
->lockfile_fd
, unique
);
1529 ret
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
),
1530 "%s/%u", ctx
->lockfile_dir
.buf
, (int)pid
);
1534 if ((size_t)ret
>= sizeof(lockfile_name
.buf
)) {
1535 return ENAMETOOLONG
;
1538 fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_RDONLY
, 0);
1543 ret
= messaging_dgm_read_unique(fd
, unique
);
1548 int messaging_dgm_cleanup(pid_t pid
)
1550 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1551 struct sun_path_buf lockfile_name
, socket_name
;
1553 struct flock lck
= {
1561 len
= snprintf(socket_name
.buf
, sizeof(socket_name
.buf
), "%s/%u",
1562 ctx
->socket_dir
.buf
, (unsigned)pid
);
1566 if ((size_t)len
>= sizeof(socket_name
.buf
)) {
1567 return ENAMETOOLONG
;
1570 len
= snprintf(lockfile_name
.buf
, sizeof(lockfile_name
.buf
), "%s/%u",
1571 ctx
->lockfile_dir
.buf
, (unsigned)pid
);
1575 if ((size_t)len
>= sizeof(lockfile_name
.buf
)) {
1576 return ENAMETOOLONG
;
1579 fd
= open(lockfile_name
.buf
, O_NONBLOCK
|O_WRONLY
, 0);
1582 if (ret
!= ENOENT
) {
1583 DEBUG(10, ("%s: open(%s) failed: %s\n", __func__
,
1584 lockfile_name
.buf
, strerror(ret
)));
1589 lck
.l_type
= F_WRLCK
;
1590 lck
.l_whence
= SEEK_SET
;
1594 ret
= fcntl(fd
, F_SETLK
, &lck
);
1597 if ((ret
!= EACCES
) && (ret
!= EAGAIN
)) {
1598 DEBUG(10, ("%s: Could not get lock: %s\n", __func__
,
1605 DEBUG(10, ("%s: Cleaning up : %s\n", __func__
, strerror(ret
)));
1607 (void)unlink(socket_name
.buf
);
1608 (void)unlink(lockfile_name
.buf
);
1613 static int messaging_dgm_wipe_fn(pid_t pid
, void *private_data
)
1615 pid_t
*our_pid
= (pid_t
*)private_data
;
1618 if (pid
== *our_pid
) {
1620 * fcntl(F_GETLK) will succeed for ourselves, we hold
1621 * that lock ourselves.
1626 ret
= messaging_dgm_cleanup(pid
);
1627 DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1628 (unsigned long)pid
, ret
? strerror(ret
) : "ok"));
1633 int messaging_dgm_wipe(void)
1635 pid_t pid
= tevent_cached_getpid();
1636 messaging_dgm_forall(messaging_dgm_wipe_fn
, &pid
);
1640 int messaging_dgm_forall(int (*fn
)(pid_t pid
, void *private_data
),
1643 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1652 messaging_dgm_validate(ctx
);
1655 * We scan the socket directory and not the lock directory. Otherwise
1656 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1660 msgdir
= opendir(ctx
->socket_dir
.buf
);
1661 if (msgdir
== NULL
) {
1665 while ((dp
= readdir(msgdir
)) != NULL
) {
1669 pid
= smb_strtoul(dp
->d_name
, NULL
, 10, &error
, SMB_STR_STANDARD
);
1670 if ((pid
== 0) || (error
!= 0)) {
1672 * . and .. and other malformed entries
1677 ret
= fn(pid
, private_data
);
1687 struct messaging_dgm_fde
{
1688 struct tevent_fd
*fde
;
1691 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev
*fde_ev
)
1693 if (fde_ev
->ctx
!= NULL
) {
1694 DLIST_REMOVE(fde_ev
->ctx
->fde_evs
, fde_ev
);
1701 * Reference counter for a struct tevent_fd messaging read event
1702 * (with callback function) on a struct tevent_context registered
1703 * on a messaging context.
1705 * If we've already registered this struct tevent_context before
1706 * (so already have a read event), just increase the reference count.
1708 * Otherwise create a new struct tevent_fd messaging read event on the
1709 * previously unseen struct tevent_context - this is what drives
1710 * the message receive processing.
1714 struct messaging_dgm_fde
*messaging_dgm_register_tevent_context(
1715 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
)
1717 struct messaging_dgm_context
*ctx
= global_dgm_context
;
1718 struct messaging_dgm_fde_ev
*fde_ev
;
1719 struct messaging_dgm_fde
*fde
;
1725 fde
= talloc(mem_ctx
, struct messaging_dgm_fde
);
1730 for (fde_ev
= ctx
->fde_evs
; fde_ev
!= NULL
; fde_ev
= fde_ev
->next
) {
1731 if (tevent_fd_get_flags(fde_ev
->fde
) == 0) {
1733 * If the event context got deleted,
1734 * tevent_fd_get_flags() will return 0
1735 * for the stale fde.
1737 * In that case we should not
1738 * use fde_ev->ev anymore.
1742 if (fde_ev
->ev
== ev
) {
1747 if (fde_ev
== NULL
) {
1748 fde_ev
= talloc(fde
, struct messaging_dgm_fde_ev
);
1749 if (fde_ev
== NULL
) {
1752 fde_ev
->fde
= tevent_add_fd(
1753 ev
, fde_ev
, ctx
->sock
, TEVENT_FD_READ
,
1754 messaging_dgm_read_handler
, ctx
);
1755 if (fde_ev
->fde
== NULL
) {
1761 DLIST_ADD(ctx
->fde_evs
, fde_ev
);
1762 talloc_set_destructor(
1763 fde_ev
, messaging_dgm_fde_ev_destructor
);
1766 * Same trick as with tdb_wrap: The caller will never
1767 * see the talloc_referenced object, the
1768 * messaging_dgm_fde_ev, so problems with
1769 * talloc_unlink will not happen.
1771 if (talloc_reference(fde
, fde_ev
) == NULL
) {
1777 fde
->fde
= fde_ev
->fde
;
1781 bool messaging_dgm_fde_active(struct messaging_dgm_fde
*fde
)
1788 flags
= tevent_fd_get_flags(fde
->fde
);
1789 return (flags
!= 0);