s3: tests: Add new test_stream_dir_rename.sh test.
[Samba.git] / ctdb / common / sock_io.c
blob81e82c59ca01166d7714bc18bbb25c396c8f14de
1 /*
2 Generic Unix-domain Socket I/O
4 Copyright (C) Amitay Isaacs 2016
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/network.h"
24 #include <talloc.h>
25 #include <tevent.h>
27 #include "lib/util/sys_rw.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/blocking.h"
31 #include "common/logging.h"
32 #include "common/sock_io.h"
34 bool sock_clean(const char *sockpath)
36 int ret;
38 ret = unlink(sockpath);
39 if (ret == 0) {
40 D_WARNING("Removed stale socket %s\n", sockpath);
41 } else if (errno != ENOENT) {
42 D_ERR("Failed to remove stale socket %s\n", sockpath);
43 return false;
46 return true;
49 int sock_connect(const char *sockpath)
51 struct sockaddr_un addr;
52 size_t len;
53 int fd, ret;
55 if (sockpath == NULL) {
56 D_ERR("Invalid socket path\n");
57 return -1;
60 memset(&addr, 0, sizeof(addr));
61 addr.sun_family = AF_UNIX;
62 len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
63 if (len >= sizeof(addr.sun_path)) {
64 D_ERR("Socket path too long, len=%zu\n", strlen(sockpath));
65 return -1;
68 fd = socket(AF_UNIX, SOCK_STREAM, 0);
69 if (fd == -1) {
70 D_ERR("socket() failed, errno=%d\n", errno);
71 return -1;
74 ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
75 if (ret == -1) {
76 D_ERR("connect() failed, errno=%d\n", errno);
77 close(fd);
78 return -1;
81 return fd;
84 struct sock_queue {
85 struct tevent_context *ev;
86 sock_queue_callback_fn_t callback;
87 void *private_data;
88 int fd;
90 struct tevent_immediate *im;
91 struct tevent_queue *queue;
92 struct tevent_fd *fde;
93 uint8_t *buf;
94 size_t buflen, begin, end;
98 * The reserved talloc headers, SOCK_QUEUE_OBJ_COUNT,
99 * and the pre-allocated pool-memory SOCK_QUEUE_POOL_SIZE,
100 * are used for the sub-objects queue->im, queue->queue, queue->fde
101 * and queue->buf.
102 * If the memory allocating sub-objects of struct sock_queue change,
103 * those values need to be adjusted.
105 #define SOCK_QUEUE_OBJ_COUNT 4
106 #define SOCK_QUEUE_POOL_SIZE 2048
108 static bool sock_queue_set_fd(struct sock_queue *queue, int fd);
109 static void sock_queue_handler(struct tevent_context *ev,
110 struct tevent_fd *fde, uint16_t flags,
111 void *private_data);
112 static void sock_queue_process(struct sock_queue *queue);
113 static void sock_queue_process_event(struct tevent_context *ev,
114 struct tevent_immediate *im,
115 void *private_data);
117 struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx,
118 struct tevent_context *ev,
119 int fd,
120 sock_queue_callback_fn_t callback,
121 void *private_data)
123 struct sock_queue *queue;
125 queue = talloc_pooled_object(mem_ctx, struct sock_queue,
126 SOCK_QUEUE_OBJ_COUNT, SOCK_QUEUE_POOL_SIZE);
127 if (queue == NULL) {
128 return NULL;
130 memset(queue, 0, sizeof(struct sock_queue));
132 queue->ev = ev;
133 queue->callback = callback;
134 queue->private_data = private_data;
136 queue->im = tevent_create_immediate(queue);
137 if (queue->im == NULL) {
138 talloc_free(queue);
139 return NULL;
142 queue->queue = tevent_queue_create(queue, "out-queue");
143 if (queue->queue == NULL) {
144 talloc_free(queue);
145 return NULL;
148 if (! sock_queue_set_fd(queue, fd)) {
149 talloc_free(queue);
150 return NULL;
153 return queue;
156 static bool sock_queue_set_fd(struct sock_queue *queue, int fd)
158 TALLOC_FREE(queue->fde);
159 queue->fd = fd;
161 if (fd != -1) {
162 int ret;
164 ret = set_blocking(fd, false);
165 if (ret != 0) {
166 return false;
169 queue->fde = tevent_add_fd(queue->ev, queue, fd,
170 TEVENT_FD_READ,
171 sock_queue_handler, queue);
172 if (queue->fde == NULL) {
173 return false;
175 tevent_fd_set_auto_close(queue->fde);
178 return true;
181 static void sock_queue_handler(struct tevent_context *ev,
182 struct tevent_fd *fde, uint16_t flags,
183 void *private_data)
185 struct sock_queue *queue = talloc_get_type_abort(
186 private_data, struct sock_queue);
187 int ret, num_ready;
188 ssize_t nread;
190 ret = ioctl(queue->fd, FIONREAD, &num_ready);
191 if (ret != 0) {
192 /* Ignore */
193 return;
196 if (num_ready == 0) {
197 /* descriptor has been closed */
198 goto fail;
201 if ((size_t)num_ready > queue->buflen - queue->end) {
202 queue->buf = talloc_realloc_size(queue, queue->buf,
203 queue->end + num_ready);
204 if (queue->buf == NULL) {
205 goto fail;
207 queue->buflen = queue->end + num_ready;
210 nread = sys_read(queue->fd, queue->buf + queue->end, num_ready);
211 if (nread < 0) {
212 goto fail;
214 queue->end += nread;
216 sock_queue_process(queue);
217 return;
219 fail:
220 queue->callback(NULL, 0, queue->private_data);
223 static void sock_queue_process(struct sock_queue *queue)
225 uint32_t pkt_size;
227 if ((queue->end - queue->begin) < sizeof(uint32_t)) {
228 /* not enough data */
229 return;
232 pkt_size = *(uint32_t *)(queue->buf + queue->begin);
233 if (pkt_size == 0) {
234 D_ERR("Invalid packet of length 0\n");
235 queue->callback(NULL, 0, queue->private_data);
236 return;
239 if ((queue->end - queue->begin) < pkt_size) {
240 /* not enough data */
241 return;
244 queue->callback(queue->buf + queue->begin, pkt_size,
245 queue->private_data);
246 queue->begin += pkt_size;
248 if (queue->begin < queue->end) {
249 /* more data to be processed */
250 tevent_schedule_immediate(queue->im, queue->ev,
251 sock_queue_process_event, queue);
252 } else {
253 TALLOC_FREE(queue->buf);
254 queue->buflen = 0;
255 queue->begin = 0;
256 queue->end = 0;
260 static void sock_queue_process_event(struct tevent_context *ev,
261 struct tevent_immediate *im,
262 void *private_data)
264 struct sock_queue *queue = talloc_get_type_abort(
265 private_data, struct sock_queue);
267 sock_queue_process(queue);
270 struct sock_queue_write_state {
271 uint8_t *pkt;
272 uint32_t pkt_size;
275 static void sock_queue_trigger(struct tevent_req *req, void *private_data);
277 int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen)
279 struct tevent_req *req;
280 struct sock_queue_write_state *state;
281 struct tevent_queue_entry *qentry;
283 if (buflen >= INT32_MAX) {
284 return -1;
287 req = tevent_req_create(queue, &state, struct sock_queue_write_state);
288 if (req == NULL) {
289 return -1;
292 state->pkt = buf;
293 state->pkt_size = (uint32_t)buflen;
295 qentry = tevent_queue_add_entry(queue->queue, queue->ev, req,
296 sock_queue_trigger, queue);
297 if (qentry == NULL) {
298 talloc_free(req);
299 return -1;
302 return 0;
305 static void sock_queue_trigger(struct tevent_req *req, void *private_data)
307 struct sock_queue *queue = talloc_get_type_abort(
308 private_data, struct sock_queue);
309 struct sock_queue_write_state *state = tevent_req_data(
310 req, struct sock_queue_write_state);
311 size_t offset = 0;
313 do {
314 ssize_t nwritten;
316 nwritten = sys_write(queue->fd, state->pkt + offset,
317 state->pkt_size - offset);
318 if (nwritten < 0) {
319 queue->callback(NULL, 0, queue->private_data);
320 return;
322 offset += nwritten;
324 } while (offset < state->pkt_size);
326 tevent_req_done(req);
327 talloc_free(req);