2 * Unix SMB/CIFS implementation.
3 * Copyright (C) Volker Lendecke 2013
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/select.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "dlinklist.h"
25 #include "pthreadpool/pthreadpool.h"
29 * This file implements two abstractions: The "unix_dgram" functions implement
30 * queueing for unix domain datagram sockets. You can send to a destination
31 * socket, and if that has no free space available, it will fall back to an
32 * anonymous socket that will poll for writability. "unix_dgram" expects the
33 * data size not to exceed the system limit.
35 * The "unix_msg" functions implement the fragmentation of large messages on
36 * top of "unix_dgram". This is what is exposed to the user of this API.
39 struct unix_dgram_msg
{
40 struct unix_dgram_msg
*prev
, *next
;
49 struct unix_dgram_send_queue
{
50 struct unix_dgram_send_queue
*prev
, *next
;
51 struct unix_dgram_ctx
*ctx
;
53 struct unix_dgram_msg
*msgs
;
57 struct unix_dgram_ctx
{
60 const struct poll_funcs
*ev_funcs
;
63 void (*recv_callback
)(struct unix_dgram_ctx
*ctx
,
64 uint8_t *msg
, size_t msg_len
,
68 struct poll_watch
*sock_read_watch
;
69 struct unix_dgram_send_queue
*send_queues
;
71 struct pthreadpool
*send_pool
;
72 struct poll_watch
*pool_read_watch
;
78 static ssize_t
iov_buflen(const struct iovec
*iov
, int iovlen
);
79 static void unix_dgram_recv_handler(struct poll_watch
*w
, int fd
, short events
,
82 /* Set socket non blocking. */
83 static int prepare_socket_nonblock(int sock
)
87 #define FLAG_TO_SET O_NONBLOCK
90 #define FLAG_TO_SET O_NDELAY
92 #define FLAG_TO_SET FNDELAY
96 flags
= fcntl(sock
, F_GETFL
);
100 flags
|= FLAG_TO_SET
;
101 if (fcntl(sock
, F_SETFL
, flags
) == -1) {
109 /* Set socket close on exec. */
110 static int prepare_socket_cloexec(int sock
)
115 flags
= fcntl(sock
, F_GETFD
, 0);
120 if (fcntl(sock
, F_SETFD
, flags
) == -1) {
127 /* Set socket non blocking and close on exec. */
128 static int prepare_socket(int sock
)
130 int ret
= prepare_socket_nonblock(sock
);
135 return prepare_socket_cloexec(sock
);
138 static int unix_dgram_init(const struct sockaddr_un
*addr
, size_t max_msg
,
139 const struct poll_funcs
*ev_funcs
,
140 void (*recv_callback
)(struct unix_dgram_ctx
*ctx
,
141 uint8_t *msg
, size_t msg_len
,
144 struct unix_dgram_ctx
**result
)
146 struct unix_dgram_ctx
*ctx
;
151 pathlen
= strlen(addr
->sun_path
)+1;
156 ctx
= malloc(offsetof(struct unix_dgram_ctx
, path
) + pathlen
);
161 memcpy(ctx
->path
, addr
->sun_path
, pathlen
);
166 ctx
->recv_buf
= malloc(max_msg
);
167 if (ctx
->recv_buf
== NULL
) {
171 ctx
->max_msg
= max_msg
;
172 ctx
->ev_funcs
= ev_funcs
;
173 ctx
->recv_callback
= recv_callback
;
174 ctx
->private_data
= private_data
;
175 ctx
->sock_read_watch
= NULL
;
176 ctx
->send_pool
= NULL
;
177 ctx
->pool_read_watch
= NULL
;
178 ctx
->send_queues
= NULL
;
179 ctx
->created_pid
= (pid_t
)-1;
181 ctx
->sock
= socket(AF_UNIX
, SOCK_DGRAM
, 0);
182 if (ctx
->sock
== -1) {
187 /* Set non-blocking and close-on-exec. */
188 ret
= prepare_socket(ctx
->sock
);
194 ret
= bind(ctx
->sock
,
195 (const struct sockaddr
*)(const void *)addr
,
202 ctx
->created_pid
= getpid();
204 ctx
->sock_read_watch
= ctx
->ev_funcs
->watch_new(
205 ctx
->ev_funcs
, ctx
->sock
, POLLIN
,
206 unix_dgram_recv_handler
, ctx
);
208 if (ctx
->sock_read_watch
== NULL
) {
225 static void unix_dgram_recv_handler(struct poll_watch
*w
, int fd
, short events
,
228 struct unix_dgram_ctx
*ctx
= (struct unix_dgram_ctx
*)private_data
;
233 iov
= (struct iovec
) {
234 .iov_base
= (void *)ctx
->recv_buf
,
235 .iov_len
= ctx
->max_msg
,
238 msg
= (struct msghdr
) {
241 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
247 received
= recvmsg(fd
, &msg
, 0);
248 if (received
== -1) {
249 if ((errno
== EAGAIN
) ||
251 (errno
== EWOULDBLOCK
) ||
253 (errno
== EINTR
) || (errno
== ENOMEM
)) {
254 /* Not really an error - just try again. */
257 /* Problem with the socket. Set it unreadable. */
258 ctx
->ev_funcs
->watch_update(w
, 0);
261 if (received
> ctx
->max_msg
) {
262 /* More than we expected, not for us */
265 ctx
->recv_callback(ctx
, ctx
->recv_buf
, received
, ctx
->private_data
);
268 static void unix_dgram_job_finished(struct poll_watch
*w
, int fd
, short events
,
271 static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx
*ctx
)
275 if (ctx
->send_pool
!= NULL
) {
279 ret
= pthreadpool_init(0, &ctx
->send_pool
);
284 signalfd
= pthreadpool_signal_fd(ctx
->send_pool
);
286 ctx
->pool_read_watch
= ctx
->ev_funcs
->watch_new(
287 ctx
->ev_funcs
, signalfd
, POLLIN
,
288 unix_dgram_job_finished
, ctx
);
289 if (ctx
->pool_read_watch
== NULL
) {
290 pthreadpool_destroy(ctx
->send_pool
);
291 ctx
->send_pool
= NULL
;
298 static int unix_dgram_send_queue_init(
299 struct unix_dgram_ctx
*ctx
, const struct sockaddr_un
*dst
,
300 struct unix_dgram_send_queue
**result
)
302 struct unix_dgram_send_queue
*q
;
306 pathlen
= strlen(dst
->sun_path
)+1;
308 q
= malloc(offsetof(struct unix_dgram_send_queue
, path
) + pathlen
);
314 memcpy(q
->path
, dst
->sun_path
, pathlen
);
316 q
->sock
= socket(AF_UNIX
, SOCK_DGRAM
, 0);
322 err
= prepare_socket_cloexec(q
->sock
);
328 ret
= connect(q
->sock
,
329 (const struct sockaddr
*)(const void *)dst
,
331 } while ((ret
== -1) && (errno
== EINTR
));
338 err
= unix_dgram_init_pthreadpool(ctx
);
343 DLIST_ADD(ctx
->send_queues
, q
);
355 static void unix_dgram_send_queue_free(struct unix_dgram_send_queue
*q
)
357 struct unix_dgram_ctx
*ctx
= q
->ctx
;
359 while (q
->msgs
!= NULL
) {
360 struct unix_dgram_msg
*msg
;
362 DLIST_REMOVE(q
->msgs
, msg
);
366 DLIST_REMOVE(ctx
->send_queues
, q
);
370 static struct unix_dgram_send_queue
*find_send_queue(
371 struct unix_dgram_ctx
*ctx
, const char *dst_sock
)
373 struct unix_dgram_send_queue
*s
;
375 for (s
= ctx
->send_queues
; s
!= NULL
; s
= s
->next
) {
376 if (strcmp(s
->path
, dst_sock
) == 0) {
383 static int queue_msg(struct unix_dgram_send_queue
*q
,
384 const struct iovec
*iov
, int iovlen
)
386 struct unix_dgram_msg
*msg
;
391 buflen
= iov_buflen(iov
, iovlen
);
396 msglen
= offsetof(struct unix_dgram_msg
, buf
) + buflen
;
397 if ((msglen
< buflen
) ||
398 (msglen
< offsetof(struct unix_dgram_msg
, buf
))) {
403 msg
= malloc(msglen
);
407 msg
->buflen
= buflen
;
411 for (i
=0; i
<iovlen
; i
++) {
412 memcpy(&msg
->buf
[buflen
], iov
[i
].iov_base
, iov
[i
].iov_len
);
413 buflen
+= iov
[i
].iov_len
;
416 DLIST_ADD_END(q
->msgs
, msg
, struct unix_dgram_msg
);
420 static void unix_dgram_send_job(void *private_data
)
422 struct unix_dgram_msg
*msg
= private_data
;
425 msg
->sent
= send(msg
->sock
, msg
->buf
, msg
->buflen
, 0);
426 } while ((msg
->sent
== -1) && (errno
== EINTR
));
429 static void unix_dgram_job_finished(struct poll_watch
*w
, int fd
, short events
,
432 struct unix_dgram_ctx
*ctx
= private_data
;
433 struct unix_dgram_send_queue
*q
;
434 struct unix_dgram_msg
*msg
;
437 ret
= pthreadpool_finished_jobs(ctx
->send_pool
, &job
, 1);
442 for (q
= ctx
->send_queues
; q
!= NULL
; q
= q
->next
) {
443 if (job
== q
->sock
) {
449 /* Huh? Should not happen */
454 DLIST_REMOVE(q
->msgs
, msg
);
457 if (q
->msgs
!= NULL
) {
458 ret
= pthreadpool_add_job(ctx
->send_pool
, q
->sock
,
459 unix_dgram_send_job
, q
->msgs
);
465 unix_dgram_send_queue_free(q
);
468 static int unix_dgram_send(struct unix_dgram_ctx
*ctx
,
469 const struct sockaddr_un
*dst
,
470 const struct iovec
*iov
, int iovlen
)
472 struct unix_dgram_send_queue
*q
;
477 * To preserve message ordering, we have to queue a message when
478 * others are waiting in line already.
480 q
= find_send_queue(ctx
, dst
->sun_path
);
482 return queue_msg(q
, iov
, iovlen
);
486 * Try a cheap nonblocking send
489 msg
.msg_name
= discard_const_p(struct sockaddr_un
, dst
);
490 msg
.msg_namelen
= sizeof(*dst
);
491 msg
.msg_iov
= discard_const_p(struct iovec
, iov
);
492 msg
.msg_iovlen
= iovlen
;
493 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
494 msg
.msg_control
= NULL
;
495 msg
.msg_controllen
= 0;
499 ret
= sendmsg(ctx
->sock
, &msg
, 0);
504 if ((errno
!= EWOULDBLOCK
) && (errno
!= EAGAIN
) && (errno
!= EINTR
)) {
506 if ((errno
!= EAGAIN
) && (errno
!= EINTR
)) {
511 ret
= unix_dgram_send_queue_init(ctx
, dst
, &q
);
515 ret
= queue_msg(q
, iov
, iovlen
);
517 unix_dgram_send_queue_free(q
);
520 ret
= pthreadpool_add_job(ctx
->send_pool
, q
->sock
,
521 unix_dgram_send_job
, q
->msgs
);
523 unix_dgram_send_queue_free(q
);
529 static int unix_dgram_sock(struct unix_dgram_ctx
*ctx
)
534 static int unix_dgram_free(struct unix_dgram_ctx
*ctx
)
536 if (ctx
->send_queues
!= NULL
) {
540 if (ctx
->send_pool
!= NULL
) {
541 int ret
= pthreadpool_destroy(ctx
->send_pool
);
545 ctx
->ev_funcs
->watch_free(ctx
->pool_read_watch
);
548 ctx
->ev_funcs
->watch_free(ctx
->sock_read_watch
);
550 if (getpid() == ctx
->created_pid
) {
551 /* If we created it, unlink. Otherwise someone else might
552 * still have it open */
563 * Every message starts with a uint64_t cookie.
565 * A value of 0 indicates a single-fragment message which is complete in
566 * itself. The data immediately follows the cookie.
568 * Every multi-fragment message has a cookie != 0 and starts with a cookie
569 * followed by a struct unix_msg_header and then the data. The pid and sock
570 * fields are used to assure uniqueness on the receiver side.
573 struct unix_msg_hdr
{
580 struct unix_msg
*prev
, *next
;
589 struct unix_msg_ctx
{
590 struct unix_dgram_ctx
*dgram
;
594 void (*recv_callback
)(struct unix_msg_ctx
*ctx
,
595 uint8_t *msg
, size_t msg_len
,
599 struct unix_msg
*msgs
;
602 static void unix_msg_recv(struct unix_dgram_ctx
*ctx
,
603 uint8_t *msg
, size_t msg_len
,
606 int unix_msg_init(const char *path
, const struct poll_funcs
*ev_funcs
,
607 size_t fragment_len
, uint64_t cookie
,
608 void (*recv_callback
)(struct unix_msg_ctx
*ctx
,
609 uint8_t *msg
, size_t msg_len
,
612 struct unix_msg_ctx
**result
)
614 struct unix_msg_ctx
*ctx
;
615 struct sockaddr_un addr
;
616 struct sockaddr_un
*paddr
= NULL
;
619 ctx
= malloc(sizeof(*ctx
));
625 size_t pathlen
= strlen(path
)+1;
627 if (pathlen
> sizeof(addr
.sun_path
)) {
630 addr
= (struct sockaddr_un
) { .sun_family
= AF_UNIX
};
631 memcpy(addr
.sun_path
, path
, pathlen
);
635 ret
= unix_dgram_init(paddr
, fragment_len
, ev_funcs
,
636 unix_msg_recv
, ctx
, &ctx
->dgram
);
642 ctx
->fragment_len
= fragment_len
;
643 ctx
->cookie
= cookie
;
644 ctx
->recv_callback
= recv_callback
;
645 ctx
->private_data
= private_data
;
652 int unix_msg_send(struct unix_msg_ctx
*ctx
, const char *dst_sock
,
653 const struct iovec
*iov
, int iovlen
)
658 struct iovec
*iov_copy
;
659 struct unix_msg_hdr hdr
;
660 struct iovec src_iov
;
661 struct sockaddr_un dst
;
664 dst_len
= strlen(dst_sock
);
665 if (dst_len
>= sizeof(dst
.sun_path
)) {
668 dst
= (struct sockaddr_un
) { .sun_family
= AF_UNIX
};
669 memcpy(dst
.sun_path
, dst_sock
, dst_len
);
675 msglen
= iov_buflen(iov
, iovlen
);
680 if (msglen
<= (ctx
->fragment_len
- sizeof(uint64_t))) {
681 struct iovec tmp_iov
[iovlen
+1];
684 tmp_iov
[0].iov_base
= &cookie
;
685 tmp_iov
[0].iov_len
= sizeof(cookie
);
687 memcpy(&tmp_iov
[1], iov
,
688 sizeof(struct iovec
) * iovlen
);
691 return unix_dgram_send(ctx
->dgram
, &dst
, tmp_iov
, iovlen
+1);
696 hdr
.sock
= unix_dgram_sock(ctx
->dgram
);
698 iov_copy
= malloc(sizeof(struct iovec
) * (iovlen
+ 2));
699 if (iov_copy
== NULL
) {
702 iov_copy
[0].iov_base
= &ctx
->cookie
;
703 iov_copy
[0].iov_len
= sizeof(ctx
->cookie
);
704 iov_copy
[1].iov_base
= &hdr
;
705 iov_copy
[1].iov_len
= sizeof(hdr
);
711 * The following write loop sends the user message in pieces. We have
712 * filled the first two iovecs above with "cookie" and "hdr". In the
713 * following loops we pull message chunks from the user iov array and
714 * fill iov_copy piece by piece, possibly truncating chunks from the
715 * caller's iov array. Ugly, but hopefully efficient.
718 while (sent
< msglen
) {
720 size_t iov_index
= 2;
722 fragment_len
= sizeof(ctx
->cookie
) + sizeof(hdr
);
724 while (fragment_len
< ctx
->fragment_len
) {
727 space
= ctx
->fragment_len
- fragment_len
;
728 chunk
= MIN(space
, src_iov
.iov_len
);
730 iov_copy
[iov_index
].iov_base
= src_iov
.iov_base
;
731 iov_copy
[iov_index
].iov_len
= chunk
;
734 src_iov
.iov_base
= (char *)src_iov
.iov_base
+ chunk
;
735 src_iov
.iov_len
-= chunk
;
736 fragment_len
+= chunk
;
738 if (src_iov
.iov_len
== 0) {
747 sent
+= (fragment_len
- sizeof(ctx
->cookie
) - sizeof(hdr
));
749 ret
= unix_dgram_send(ctx
->dgram
, &dst
, iov_copy
, iov_index
);
758 if (ctx
->cookie
== 0) {
765 static void unix_msg_recv(struct unix_dgram_ctx
*dgram_ctx
,
766 uint8_t *buf
, size_t buflen
,
769 struct unix_msg_ctx
*ctx
= (struct unix_msg_ctx
*)private_data
;
770 struct unix_msg_hdr hdr
;
771 struct unix_msg
*msg
;
775 if (buflen
< sizeof(cookie
)) {
778 memcpy(&cookie
, buf
, sizeof(cookie
));
780 buf
+= sizeof(cookie
);
781 buflen
-= sizeof(cookie
);
784 ctx
->recv_callback(ctx
, buf
, buflen
, ctx
->private_data
);
788 if (buflen
< sizeof(hdr
)) {
791 memcpy(&hdr
, buf
, sizeof(hdr
));
794 buflen
-= sizeof(hdr
);
796 for (msg
= ctx
->msgs
; msg
!= NULL
; msg
= msg
->next
) {
797 if ((msg
->sender_pid
== hdr
.pid
) &&
798 (msg
->sender_sock
== hdr
.sock
)) {
803 if ((msg
!= NULL
) && (msg
->cookie
!= cookie
)) {
804 DLIST_REMOVE(ctx
->msgs
, msg
);
810 msg
= malloc(offsetof(struct unix_msg
, buf
) + hdr
.msglen
);
814 msg
->msglen
= hdr
.msglen
;
816 msg
->sender_pid
= hdr
.pid
;
817 msg
->sender_sock
= hdr
.sock
;
818 msg
->cookie
= cookie
;
819 DLIST_ADD(ctx
->msgs
, msg
);
822 space
= msg
->msglen
- msg
->received
;
823 if (buflen
> space
) {
827 memcpy(msg
->buf
+ msg
->received
, buf
, buflen
);
828 msg
->received
+= buflen
;
830 if (msg
->received
< msg
->msglen
) {
834 DLIST_REMOVE(ctx
->msgs
, msg
);
835 ctx
->recv_callback(ctx
, msg
->buf
, msg
->msglen
, ctx
->private_data
);
839 int unix_msg_free(struct unix_msg_ctx
*ctx
)
843 ret
= unix_dgram_free(ctx
->dgram
);
848 while (ctx
->msgs
!= NULL
) {
849 struct unix_msg
*msg
= ctx
->msgs
;
850 DLIST_REMOVE(ctx
->msgs
, msg
);
858 static ssize_t
iov_buflen(const struct iovec
*iov
, int iovlen
)
863 for (i
=0; i
<iovlen
; i
++) {
864 size_t thislen
= iov
[i
].iov_len
;
865 size_t tmp
= buflen
+ thislen
;
867 if ((tmp
< buflen
) || (tmp
< thislen
)) {