2 Generic Unix-domain Socket I/O
4 Copyright (C) Amitay Isaacs 2016
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "system/network.h"
27 #include "lib/util/sys_rw.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/blocking.h"
31 #include "common/logging.h"
32 #include "common/sock_io.h"
34 bool sock_clean(const char *sockpath
)
38 ret
= unlink(sockpath
);
40 D_WARNING("Removed stale socket %s\n", sockpath
);
41 } else if (errno
!= ENOENT
) {
42 D_ERR("Failed to remove stale socket %s\n", sockpath
);
49 int sock_connect(const char *sockpath
)
51 struct sockaddr_un addr
;
55 if (sockpath
== NULL
) {
56 D_ERR("Invalid socket path\n");
60 memset(&addr
, 0, sizeof(addr
));
61 addr
.sun_family
= AF_UNIX
;
62 len
= strlcpy(addr
.sun_path
, sockpath
, sizeof(addr
.sun_path
));
63 if (len
>= sizeof(addr
.sun_path
)) {
64 D_ERR("Socket path too long, len=%zu\n", strlen(sockpath
));
68 fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
70 D_ERR("socket() failed, errno=%d\n", errno
);
74 ret
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
76 D_ERR("connect() failed, errno=%d\n", errno
);
85 struct tevent_context
*ev
;
86 sock_queue_callback_fn_t callback
;
90 struct tevent_immediate
*im
;
91 struct tevent_queue
*queue
;
92 struct tevent_fd
*fde
;
94 size_t buflen
, begin
, end
;
98 * The reserved talloc headers, SOCK_QUEUE_OBJ_COUNT,
99 * and the pre-allocated pool-memory SOCK_QUEUE_POOL_SIZE,
100 * are used for the sub-objects queue->im, queue->queue, queue->fde
102 * If the memory allocating sub-objects of struct sock_queue change,
103 * those values need to be adjusted.
105 #define SOCK_QUEUE_OBJ_COUNT 4
106 #define SOCK_QUEUE_POOL_SIZE 2048
108 static bool sock_queue_set_fd(struct sock_queue
*queue
, int fd
);
109 static void sock_queue_handler(struct tevent_context
*ev
,
110 struct tevent_fd
*fde
, uint16_t flags
,
112 static void sock_queue_process(struct sock_queue
*queue
);
113 static void sock_queue_process_event(struct tevent_context
*ev
,
114 struct tevent_immediate
*im
,
117 struct sock_queue
*sock_queue_setup(TALLOC_CTX
*mem_ctx
,
118 struct tevent_context
*ev
,
120 sock_queue_callback_fn_t callback
,
123 struct sock_queue
*queue
;
125 queue
= talloc_pooled_object(mem_ctx
, struct sock_queue
,
126 SOCK_QUEUE_OBJ_COUNT
, SOCK_QUEUE_POOL_SIZE
);
130 memset(queue
, 0, sizeof(struct sock_queue
));
133 queue
->callback
= callback
;
134 queue
->private_data
= private_data
;
136 queue
->im
= tevent_create_immediate(queue
);
137 if (queue
->im
== NULL
) {
142 queue
->queue
= tevent_queue_create(queue
, "out-queue");
143 if (queue
->queue
== NULL
) {
148 if (! sock_queue_set_fd(queue
, fd
)) {
156 static bool sock_queue_set_fd(struct sock_queue
*queue
, int fd
)
158 TALLOC_FREE(queue
->fde
);
164 ret
= set_blocking(fd
, false);
169 queue
->fde
= tevent_add_fd(queue
->ev
, queue
, fd
,
171 sock_queue_handler
, queue
);
172 if (queue
->fde
== NULL
) {
175 tevent_fd_set_auto_close(queue
->fde
);
181 static void sock_queue_handler(struct tevent_context
*ev
,
182 struct tevent_fd
*fde
, uint16_t flags
,
185 struct sock_queue
*queue
= talloc_get_type_abort(
186 private_data
, struct sock_queue
);
190 ret
= ioctl(queue
->fd
, FIONREAD
, &num_ready
);
196 if (num_ready
== 0) {
197 /* descriptor has been closed */
201 if ((size_t)num_ready
> queue
->buflen
- queue
->end
) {
202 queue
->buf
= talloc_realloc_size(queue
, queue
->buf
,
203 queue
->end
+ num_ready
);
204 if (queue
->buf
== NULL
) {
207 queue
->buflen
= queue
->end
+ num_ready
;
210 nread
= sys_read(queue
->fd
, queue
->buf
+ queue
->end
, num_ready
);
216 sock_queue_process(queue
);
220 queue
->callback(NULL
, 0, queue
->private_data
);
223 static void sock_queue_process(struct sock_queue
*queue
)
227 if ((queue
->end
- queue
->begin
) < sizeof(uint32_t)) {
228 /* not enough data */
232 pkt_size
= *(uint32_t *)(queue
->buf
+ queue
->begin
);
234 D_ERR("Invalid packet of length 0\n");
235 queue
->callback(NULL
, 0, queue
->private_data
);
239 if ((queue
->end
- queue
->begin
) < pkt_size
) {
240 /* not enough data */
244 queue
->callback(queue
->buf
+ queue
->begin
, pkt_size
,
245 queue
->private_data
);
246 queue
->begin
+= pkt_size
;
248 if (queue
->begin
< queue
->end
) {
249 /* more data to be processed */
250 tevent_schedule_immediate(queue
->im
, queue
->ev
,
251 sock_queue_process_event
, queue
);
253 TALLOC_FREE(queue
->buf
);
260 static void sock_queue_process_event(struct tevent_context
*ev
,
261 struct tevent_immediate
*im
,
264 struct sock_queue
*queue
= talloc_get_type_abort(
265 private_data
, struct sock_queue
);
267 sock_queue_process(queue
);
270 struct sock_queue_write_state
{
275 static void sock_queue_trigger(struct tevent_req
*req
, void *private_data
);
277 int sock_queue_write(struct sock_queue
*queue
, uint8_t *buf
, size_t buflen
)
279 struct tevent_req
*req
;
280 struct sock_queue_write_state
*state
;
281 struct tevent_queue_entry
*qentry
;
283 if (buflen
>= INT32_MAX
) {
287 req
= tevent_req_create(queue
, &state
, struct sock_queue_write_state
);
293 state
->pkt_size
= (uint32_t)buflen
;
295 qentry
= tevent_queue_add_entry(queue
->queue
, queue
->ev
, req
,
296 sock_queue_trigger
, queue
);
297 if (qentry
== NULL
) {
305 static void sock_queue_trigger(struct tevent_req
*req
, void *private_data
)
307 struct sock_queue
*queue
= talloc_get_type_abort(
308 private_data
, struct sock_queue
);
309 struct sock_queue_write_state
*state
= tevent_req_data(
310 req
, struct sock_queue_write_state
);
316 nwritten
= sys_write(queue
->fd
, state
->pkt
+ offset
,
317 state
->pkt_size
- offset
);
319 queue
->callback(NULL
, 0, queue
->private_data
);
324 } while (offset
< state
->pkt_size
);
326 tevent_req_done(req
);