lib:talloc. Fix memory leak when destructors reparent children.
[Samba.git] / lib / async_req / async_sock.c
blobc0ad8f303b8fcd7b8b1f6e2f5f54ca80f405be3a
1 /*
2 Unix SMB/CIFS implementation.
3 async socket syscalls
4 Copyright (C) Volker Lendecke 2008
6 ** NOTE! The following LGPL license applies to the async_sock
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Library General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30 #include "lib/util/iov_buf.h"
32 /* Note: lib/util/ is currently GPL */
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/samba_util.h"
36 struct async_connect_state {
37 int fd;
38 struct tevent_fd *fde;
39 int result;
40 long old_sockflags;
41 socklen_t address_len;
42 struct sockaddr_storage address;
44 void (*before_connect)(void *private_data);
45 void (*after_connect)(void *private_data);
46 void *private_data;
49 static void async_connect_cleanup(struct tevent_req *req,
50 enum tevent_req_state req_state);
51 static void async_connect_connected(struct tevent_context *ev,
52 struct tevent_fd *fde, uint16_t flags,
53 void *priv);
55 /**
56 * @brief async version of connect(2)
57 * @param[in] mem_ctx The memory context to hang the result off
58 * @param[in] ev The event context to work from
59 * @param[in] fd The socket to recv from
60 * @param[in] address Where to connect?
61 * @param[in] address_len Length of *address
62 * @retval The async request
64 * This function sets the socket into non-blocking state to be able to call
65 * connect in an async state. This will be reset when the request is finished.
68 struct tevent_req *async_connect_send(
69 TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
70 const struct sockaddr *address, socklen_t address_len,
71 void (*before_connect)(void *private_data),
72 void (*after_connect)(void *private_data),
73 void *private_data)
75 struct tevent_req *req;
76 struct async_connect_state *state;
77 int ret;
79 req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
80 if (req == NULL) {
81 return NULL;
84 /**
85 * We have to set the socket to nonblocking for async connect(2). Keep
86 * the old sockflags around.
89 state->fd = fd;
90 state->before_connect = before_connect;
91 state->after_connect = after_connect;
92 state->private_data = private_data;
94 state->old_sockflags = fcntl(fd, F_GETFL, 0);
95 if (state->old_sockflags == -1) {
96 tevent_req_error(req, errno);
97 return tevent_req_post(req, ev);
100 tevent_req_set_cleanup_fn(req, async_connect_cleanup);
102 state->address_len = address_len;
103 if (address_len > sizeof(state->address)) {
104 tevent_req_error(req, EINVAL);
105 return tevent_req_post(req, ev);
107 memcpy(&state->address, address, address_len);
109 ret = set_blocking(fd, false);
110 if (ret == -1) {
111 tevent_req_error(req, errno);
112 return tevent_req_post(req, ev);
115 if (state->before_connect != NULL) {
116 state->before_connect(state->private_data);
119 state->result = connect(fd, address, address_len);
121 if (state->after_connect != NULL) {
122 state->after_connect(state->private_data);
125 if (state->result == 0) {
126 tevent_req_done(req);
127 return tevent_req_post(req, ev);
131 * The only errno indicating that the connect is still in
132 * flight is EINPROGRESS, everything else is an error
135 if (errno != EINPROGRESS) {
136 tevent_req_error(req, errno);
137 return tevent_req_post(req, ev);
140 state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE,
141 async_connect_connected, req);
142 if (state->fde == NULL) {
143 tevent_req_error(req, ENOMEM);
144 return tevent_req_post(req, ev);
146 return req;
149 static void async_connect_cleanup(struct tevent_req *req,
150 enum tevent_req_state req_state)
152 struct async_connect_state *state =
153 tevent_req_data(req, struct async_connect_state);
155 TALLOC_FREE(state->fde);
156 if (state->fd != -1) {
157 int ret;
159 ret = fcntl(state->fd, F_SETFL, state->old_sockflags);
160 if (ret == -1) {
161 abort();
164 state->fd = -1;
169 * fde event handler for connect(2)
170 * @param[in] ev The event context that sent us here
171 * @param[in] fde The file descriptor event associated with the connect
172 * @param[in] flags Indicate read/writeability of the socket
173 * @param[in] priv private data, "struct async_req *" in this case
176 static void async_connect_connected(struct tevent_context *ev,
177 struct tevent_fd *fde, uint16_t flags,
178 void *priv)
180 struct tevent_req *req = talloc_get_type_abort(
181 priv, struct tevent_req);
182 struct async_connect_state *state =
183 tevent_req_data(req, struct async_connect_state);
184 int ret;
185 int socket_error = 0;
186 socklen_t slen = sizeof(socket_error);
188 ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
189 &socket_error, &slen);
191 if (ret != 0) {
193 * According to Stevens this is the Solaris behaviour
194 * in case the connection encountered an error:
195 * getsockopt() fails, error is in errno
197 tevent_req_error(req, errno);
198 return;
201 if (socket_error != 0) {
203 * Berkeley derived implementations (including) Linux
204 * return the pending error via socket_error.
206 tevent_req_error(req, socket_error);
207 return;
210 tevent_req_done(req);
211 return;
214 int async_connect_recv(struct tevent_req *req, int *perrno)
216 int err = tevent_req_simple_recv_unix(req);
218 if (err != 0) {
219 *perrno = err;
220 return -1;
223 return 0;
226 struct writev_state {
227 struct tevent_context *ev;
228 int fd;
229 struct tevent_fd *fde;
230 struct iovec *iov;
231 int count;
232 size_t total_size;
233 uint16_t flags;
234 bool err_on_readability;
237 static void writev_cleanup(struct tevent_req *req,
238 enum tevent_req_state req_state);
239 static void writev_trigger(struct tevent_req *req, void *private_data);
240 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
241 uint16_t flags, void *private_data);
243 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
244 struct tevent_queue *queue, int fd,
245 bool err_on_readability,
246 struct iovec *iov, int count)
248 struct tevent_req *req;
249 struct writev_state *state;
251 req = tevent_req_create(mem_ctx, &state, struct writev_state);
252 if (req == NULL) {
253 return NULL;
255 state->ev = ev;
256 state->fd = fd;
257 state->total_size = 0;
258 state->count = count;
259 state->iov = (struct iovec *)talloc_memdup(
260 state, iov, sizeof(struct iovec) * count);
261 if (tevent_req_nomem(state->iov, req)) {
262 return tevent_req_post(req, ev);
264 state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
265 state->err_on_readability = err_on_readability;
267 tevent_req_set_cleanup_fn(req, writev_cleanup);
269 if (queue == NULL) {
270 state->fde = tevent_add_fd(state->ev, state, state->fd,
271 state->flags, writev_handler, req);
272 if (tevent_req_nomem(state->fde, req)) {
273 return tevent_req_post(req, ev);
275 return req;
278 if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
279 tevent_req_oom(req);
280 return tevent_req_post(req, ev);
282 return req;
285 static void writev_cleanup(struct tevent_req *req,
286 enum tevent_req_state req_state)
288 struct writev_state *state = tevent_req_data(req, struct writev_state);
290 TALLOC_FREE(state->fde);
293 static void writev_trigger(struct tevent_req *req, void *private_data)
295 struct writev_state *state = tevent_req_data(req, struct writev_state);
297 state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
298 writev_handler, req);
299 if (tevent_req_nomem(state->fde, req)) {
300 return;
304 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
305 uint16_t flags, void *private_data)
307 struct tevent_req *req = talloc_get_type_abort(
308 private_data, struct tevent_req);
309 struct writev_state *state =
310 tevent_req_data(req, struct writev_state);
311 size_t written;
312 bool ok;
314 if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
315 int ret, value;
317 if (state->err_on_readability) {
318 /* Readable and the caller wants an error on read. */
319 tevent_req_error(req, EPIPE);
320 return;
323 /* Might be an error. Check if there are bytes to read */
324 ret = ioctl(state->fd, FIONREAD, &value);
325 /* FIXME - should we also check
326 for ret == 0 and value == 0 here ? */
327 if (ret == -1) {
328 /* There's an error. */
329 tevent_req_error(req, EPIPE);
330 return;
332 /* A request for TEVENT_FD_READ will succeed from now and
333 forevermore until the bytes are read so if there was
334 an error we'll wait until we do read, then get it in
335 the read callback function. Until then, remove TEVENT_FD_READ
336 from the flags we're waiting for. */
337 state->flags &= ~TEVENT_FD_READ;
338 TEVENT_FD_NOT_READABLE(fde);
340 /* If not writable, we're done. */
341 if (!(flags & TEVENT_FD_WRITE)) {
342 return;
346 written = writev(state->fd, state->iov, state->count);
347 if ((written == -1) && (errno == EINTR)) {
348 /* retry */
349 return;
351 if (written == -1) {
352 tevent_req_error(req, errno);
353 return;
355 if (written == 0) {
356 tevent_req_error(req, EPIPE);
357 return;
359 state->total_size += written;
361 ok = iov_advance(&state->iov, &state->count, written);
362 if (!ok) {
363 tevent_req_error(req, EIO);
364 return;
367 if (state->count == 0) {
368 tevent_req_done(req);
369 return;
373 ssize_t writev_recv(struct tevent_req *req, int *perrno)
375 struct writev_state *state =
376 tevent_req_data(req, struct writev_state);
377 ssize_t ret;
379 if (tevent_req_is_unix_error(req, perrno)) {
380 tevent_req_received(req);
381 return -1;
383 ret = state->total_size;
384 tevent_req_received(req);
385 return ret;
388 struct read_packet_state {
389 int fd;
390 struct tevent_fd *fde;
391 uint8_t *buf;
392 size_t nread;
393 ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
394 void *private_data;
397 static void read_packet_cleanup(struct tevent_req *req,
398 enum tevent_req_state req_state);
399 static void read_packet_handler(struct tevent_context *ev,
400 struct tevent_fd *fde,
401 uint16_t flags, void *private_data);
403 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
404 struct tevent_context *ev,
405 int fd, size_t initial,
406 ssize_t (*more)(uint8_t *buf,
407 size_t buflen,
408 void *private_data),
409 void *private_data)
411 struct tevent_req *req;
412 struct read_packet_state *state;
414 req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
415 if (req == NULL) {
416 return NULL;
418 state->fd = fd;
419 state->nread = 0;
420 state->more = more;
421 state->private_data = private_data;
423 tevent_req_set_cleanup_fn(req, read_packet_cleanup);
425 state->buf = talloc_array(state, uint8_t, initial);
426 if (tevent_req_nomem(state->buf, req)) {
427 return tevent_req_post(req, ev);
430 state->fde = tevent_add_fd(ev, state, fd,
431 TEVENT_FD_READ, read_packet_handler,
432 req);
433 if (tevent_req_nomem(state->fde, req)) {
434 return tevent_req_post(req, ev);
436 return req;
439 static void read_packet_cleanup(struct tevent_req *req,
440 enum tevent_req_state req_state)
442 struct read_packet_state *state =
443 tevent_req_data(req, struct read_packet_state);
445 TALLOC_FREE(state->fde);
448 static void read_packet_handler(struct tevent_context *ev,
449 struct tevent_fd *fde,
450 uint16_t flags, void *private_data)
452 struct tevent_req *req = talloc_get_type_abort(
453 private_data, struct tevent_req);
454 struct read_packet_state *state =
455 tevent_req_data(req, struct read_packet_state);
456 size_t total = talloc_get_size(state->buf);
457 ssize_t nread, more;
458 uint8_t *tmp;
460 nread = recv(state->fd, state->buf+state->nread, total-state->nread,
462 if ((nread == -1) && (errno == ENOTSOCK)) {
463 nread = read(state->fd, state->buf+state->nread,
464 total-state->nread);
466 if ((nread == -1) && (errno == EINTR)) {
467 /* retry */
468 return;
470 if (nread == -1) {
471 tevent_req_error(req, errno);
472 return;
474 if (nread == 0) {
475 tevent_req_error(req, EPIPE);
476 return;
479 state->nread += nread;
480 if (state->nread < total) {
481 /* Come back later */
482 return;
486 * We got what was initially requested. See if "more" asks for -- more.
488 if (state->more == NULL) {
489 /* Nobody to ask, this is a async read_data */
490 tevent_req_done(req);
491 return;
494 more = state->more(state->buf, total, state->private_data);
495 if (more == -1) {
496 /* We got an invalid packet, tell the caller */
497 tevent_req_error(req, EIO);
498 return;
500 if (more == 0) {
501 /* We're done, full packet received */
502 tevent_req_done(req);
503 return;
506 if (total + more < total) {
507 tevent_req_error(req, EMSGSIZE);
508 return;
511 tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
512 if (tevent_req_nomem(tmp, req)) {
513 return;
515 state->buf = tmp;
518 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
519 uint8_t **pbuf, int *perrno)
521 struct read_packet_state *state =
522 tevent_req_data(req, struct read_packet_state);
524 if (tevent_req_is_unix_error(req, perrno)) {
525 tevent_req_received(req);
526 return -1;
528 *pbuf = talloc_move(mem_ctx, &state->buf);
529 tevent_req_received(req);
530 return talloc_get_size(*pbuf);
533 struct wait_for_read_state {
534 struct tevent_fd *fde;
535 int fd;
536 bool check_errors;
539 static void wait_for_read_cleanup(struct tevent_req *req,
540 enum tevent_req_state req_state);
541 static void wait_for_read_done(struct tevent_context *ev,
542 struct tevent_fd *fde,
543 uint16_t flags,
544 void *private_data);
546 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
547 struct tevent_context *ev, int fd,
548 bool check_errors)
550 struct tevent_req *req;
551 struct wait_for_read_state *state;
553 req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
554 if (req == NULL) {
555 return NULL;
558 tevent_req_set_cleanup_fn(req, wait_for_read_cleanup);
560 state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
561 wait_for_read_done, req);
562 if (tevent_req_nomem(state->fde, req)) {
563 return tevent_req_post(req, ev);
566 state->fd = fd;
567 state->check_errors = check_errors;
568 return req;
571 static void wait_for_read_cleanup(struct tevent_req *req,
572 enum tevent_req_state req_state)
574 struct wait_for_read_state *state =
575 tevent_req_data(req, struct wait_for_read_state);
577 TALLOC_FREE(state->fde);
580 static void wait_for_read_done(struct tevent_context *ev,
581 struct tevent_fd *fde,
582 uint16_t flags,
583 void *private_data)
585 struct tevent_req *req = talloc_get_type_abort(
586 private_data, struct tevent_req);
587 struct wait_for_read_state *state =
588 tevent_req_data(req, struct wait_for_read_state);
589 ssize_t nread;
590 char c;
592 if ((flags & TEVENT_FD_READ) == 0) {
593 return;
596 if (!state->check_errors) {
597 tevent_req_done(req);
598 return;
601 nread = recv(state->fd, &c, 1, MSG_PEEK);
603 if (nread == 0) {
604 tevent_req_error(req, EPIPE);
605 return;
608 if ((nread == -1) && (errno == EINTR)) {
609 /* come back later */
610 return;
613 if ((nread == -1) && (errno == ENOTSOCK)) {
614 /* Ignore this specific error on pipes */
615 tevent_req_done(req);
616 return;
619 if (nread == -1) {
620 tevent_req_error(req, errno);
621 return;
624 tevent_req_done(req);
627 bool wait_for_read_recv(struct tevent_req *req, int *perr)
629 int err = tevent_req_simple_recv_unix(req);
631 if (err != 0) {
632 *perr = err;
633 return false;
636 return true;