async_req: make async_connect_send() "reentrant"
[Samba.git] / lib / async_req / async_sock.c
bloba9e84d285dda70fa140fecbd5e14c210cdb247e2
1 /*
2 Unix SMB/CIFS implementation.
3 async socket syscalls
4 Copyright (C) Volker Lendecke 2008
6 ** NOTE! The following LGPL license applies to the async_sock
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Library General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30 #include "lib/util/iov_buf.h"
32 /* Note: lib/util/ is currently GPL */
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/samba_util.h"
36 struct async_connect_state {
37 int fd;
38 struct tevent_fd *fde;
39 int result;
40 long old_sockflags;
41 socklen_t address_len;
42 struct sockaddr_storage address;
44 void (*before_connect)(void *private_data);
45 void (*after_connect)(void *private_data);
46 void *private_data;
49 static void async_connect_cleanup(struct tevent_req *req,
50 enum tevent_req_state req_state);
51 static void async_connect_connected(struct tevent_context *ev,
52 struct tevent_fd *fde, uint16_t flags,
53 void *priv);
55 /**
56 * @brief async version of connect(2)
57 * @param[in] mem_ctx The memory context to hang the result off
58 * @param[in] ev The event context to work from
59 * @param[in] fd The socket to recv from
60 * @param[in] address Where to connect?
61 * @param[in] address_len Length of *address
62 * @retval The async request
64 * This function sets the socket into non-blocking state to be able to call
65 * connect in an async state. This will be reset when the request is finished.
68 struct tevent_req *async_connect_send(
69 TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
70 const struct sockaddr *address, socklen_t address_len,
71 void (*before_connect)(void *private_data),
72 void (*after_connect)(void *private_data),
73 void *private_data)
75 struct tevent_req *req;
76 struct async_connect_state *state;
77 int ret;
79 req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
80 if (req == NULL) {
81 return NULL;
84 /**
85 * We have to set the socket to nonblocking for async connect(2). Keep
86 * the old sockflags around.
89 state->fd = fd;
90 state->before_connect = before_connect;
91 state->after_connect = after_connect;
92 state->private_data = private_data;
94 state->old_sockflags = fcntl(fd, F_GETFL, 0);
95 if (state->old_sockflags == -1) {
96 tevent_req_error(req, errno);
97 return tevent_req_post(req, ev);
100 tevent_req_set_cleanup_fn(req, async_connect_cleanup);
102 state->address_len = address_len;
103 if (address_len > sizeof(state->address)) {
104 tevent_req_error(req, EINVAL);
105 return tevent_req_post(req, ev);
107 memcpy(&state->address, address, address_len);
109 ret = set_blocking(fd, false);
110 if (ret == -1) {
111 tevent_req_error(req, errno);
112 return tevent_req_post(req, ev);
115 if (state->before_connect != NULL) {
116 state->before_connect(state->private_data);
119 state->result = connect(fd, address, address_len);
121 if (state->after_connect != NULL) {
122 state->after_connect(state->private_data);
125 if (state->result == 0) {
126 tevent_req_done(req);
127 return tevent_req_post(req, ev);
131 * The only errno indicating that an initial connect is still
132 * in flight is EINPROGRESS.
134 * We get EALREADY when someone calls us a second time for a
135 * given fd and the connect is still in flight (and returned
136 * EINPROGRESS the first time).
138 * This allows callers like open_socket_out_send() to reuse
139 * fds and call us with an fd for which the connect is still
140 * in flight. The proper thing to do for callers would be
141 * closing the fd and starting from scratch with a fresh
142 * socket.
145 if (errno != EINPROGRESS && errno != EALREADY) {
146 tevent_req_error(req, errno);
147 return tevent_req_post(req, ev);
150 state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE,
151 async_connect_connected, req);
152 if (state->fde == NULL) {
153 tevent_req_error(req, ENOMEM);
154 return tevent_req_post(req, ev);
156 return req;
159 static void async_connect_cleanup(struct tevent_req *req,
160 enum tevent_req_state req_state)
162 struct async_connect_state *state =
163 tevent_req_data(req, struct async_connect_state);
165 TALLOC_FREE(state->fde);
166 if (state->fd != -1) {
167 int ret;
169 ret = fcntl(state->fd, F_SETFL, state->old_sockflags);
170 if (ret == -1) {
171 abort();
174 state->fd = -1;
179 * fde event handler for connect(2)
180 * @param[in] ev The event context that sent us here
181 * @param[in] fde The file descriptor event associated with the connect
182 * @param[in] flags Indicate read/writeability of the socket
183 * @param[in] priv private data, "struct async_req *" in this case
186 static void async_connect_connected(struct tevent_context *ev,
187 struct tevent_fd *fde, uint16_t flags,
188 void *priv)
190 struct tevent_req *req = talloc_get_type_abort(
191 priv, struct tevent_req);
192 struct async_connect_state *state =
193 tevent_req_data(req, struct async_connect_state);
194 int ret;
195 int socket_error = 0;
196 socklen_t slen = sizeof(socket_error);
198 ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
199 &socket_error, &slen);
201 if (ret != 0) {
203 * According to Stevens this is the Solaris behaviour
204 * in case the connection encountered an error:
205 * getsockopt() fails, error is in errno
207 tevent_req_error(req, errno);
208 return;
211 if (socket_error != 0) {
213 * Berkeley derived implementations (including) Linux
214 * return the pending error via socket_error.
216 tevent_req_error(req, socket_error);
217 return;
220 tevent_req_done(req);
221 return;
224 int async_connect_recv(struct tevent_req *req, int *perrno)
226 int err = tevent_req_simple_recv_unix(req);
228 if (err != 0) {
229 *perrno = err;
230 return -1;
233 return 0;
236 struct writev_state {
237 struct tevent_context *ev;
238 int fd;
239 struct tevent_fd *fde;
240 struct iovec *iov;
241 int count;
242 size_t total_size;
243 uint16_t flags;
244 bool err_on_readability;
247 static void writev_cleanup(struct tevent_req *req,
248 enum tevent_req_state req_state);
249 static void writev_trigger(struct tevent_req *req, void *private_data);
250 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
251 uint16_t flags, void *private_data);
253 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
254 struct tevent_queue *queue, int fd,
255 bool err_on_readability,
256 struct iovec *iov, int count)
258 struct tevent_req *req;
259 struct writev_state *state;
261 req = tevent_req_create(mem_ctx, &state, struct writev_state);
262 if (req == NULL) {
263 return NULL;
265 state->ev = ev;
266 state->fd = fd;
267 state->total_size = 0;
268 state->count = count;
269 state->iov = (struct iovec *)talloc_memdup(
270 state, iov, sizeof(struct iovec) * count);
271 if (tevent_req_nomem(state->iov, req)) {
272 return tevent_req_post(req, ev);
274 state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
275 state->err_on_readability = err_on_readability;
277 tevent_req_set_cleanup_fn(req, writev_cleanup);
279 if (queue == NULL) {
280 state->fde = tevent_add_fd(state->ev, state, state->fd,
281 state->flags, writev_handler, req);
282 if (tevent_req_nomem(state->fde, req)) {
283 return tevent_req_post(req, ev);
285 return req;
288 if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
289 tevent_req_oom(req);
290 return tevent_req_post(req, ev);
292 return req;
295 static void writev_cleanup(struct tevent_req *req,
296 enum tevent_req_state req_state)
298 struct writev_state *state = tevent_req_data(req, struct writev_state);
300 TALLOC_FREE(state->fde);
303 static void writev_trigger(struct tevent_req *req, void *private_data)
305 struct writev_state *state = tevent_req_data(req, struct writev_state);
307 state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
308 writev_handler, req);
309 if (tevent_req_nomem(state->fde, req)) {
310 return;
314 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
315 uint16_t flags, void *private_data)
317 struct tevent_req *req = talloc_get_type_abort(
318 private_data, struct tevent_req);
319 struct writev_state *state =
320 tevent_req_data(req, struct writev_state);
321 size_t written;
322 bool ok;
324 if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
325 int ret, value;
327 if (state->err_on_readability) {
328 /* Readable and the caller wants an error on read. */
329 tevent_req_error(req, EPIPE);
330 return;
333 /* Might be an error. Check if there are bytes to read */
334 ret = ioctl(state->fd, FIONREAD, &value);
335 /* FIXME - should we also check
336 for ret == 0 and value == 0 here ? */
337 if (ret == -1) {
338 /* There's an error. */
339 tevent_req_error(req, EPIPE);
340 return;
342 /* A request for TEVENT_FD_READ will succeed from now and
343 forevermore until the bytes are read so if there was
344 an error we'll wait until we do read, then get it in
345 the read callback function. Until then, remove TEVENT_FD_READ
346 from the flags we're waiting for. */
347 state->flags &= ~TEVENT_FD_READ;
348 TEVENT_FD_NOT_READABLE(fde);
350 /* If not writable, we're done. */
351 if (!(flags & TEVENT_FD_WRITE)) {
352 return;
356 written = writev(state->fd, state->iov, state->count);
357 if ((written == -1) && (errno == EINTR)) {
358 /* retry */
359 return;
361 if (written == -1) {
362 tevent_req_error(req, errno);
363 return;
365 if (written == 0) {
366 tevent_req_error(req, EPIPE);
367 return;
369 state->total_size += written;
371 ok = iov_advance(&state->iov, &state->count, written);
372 if (!ok) {
373 tevent_req_error(req, EIO);
374 return;
377 if (state->count == 0) {
378 tevent_req_done(req);
379 return;
383 ssize_t writev_recv(struct tevent_req *req, int *perrno)
385 struct writev_state *state =
386 tevent_req_data(req, struct writev_state);
387 ssize_t ret;
389 if (tevent_req_is_unix_error(req, perrno)) {
390 tevent_req_received(req);
391 return -1;
393 ret = state->total_size;
394 tevent_req_received(req);
395 return ret;
398 struct read_packet_state {
399 int fd;
400 struct tevent_fd *fde;
401 uint8_t *buf;
402 size_t nread;
403 ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
404 void *private_data;
407 static void read_packet_cleanup(struct tevent_req *req,
408 enum tevent_req_state req_state);
409 static void read_packet_handler(struct tevent_context *ev,
410 struct tevent_fd *fde,
411 uint16_t flags, void *private_data);
413 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
414 struct tevent_context *ev,
415 int fd, size_t initial,
416 ssize_t (*more)(uint8_t *buf,
417 size_t buflen,
418 void *private_data),
419 void *private_data)
421 struct tevent_req *req;
422 struct read_packet_state *state;
424 req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
425 if (req == NULL) {
426 return NULL;
428 state->fd = fd;
429 state->nread = 0;
430 state->more = more;
431 state->private_data = private_data;
433 tevent_req_set_cleanup_fn(req, read_packet_cleanup);
435 state->buf = talloc_array(state, uint8_t, initial);
436 if (tevent_req_nomem(state->buf, req)) {
437 return tevent_req_post(req, ev);
440 state->fde = tevent_add_fd(ev, state, fd,
441 TEVENT_FD_READ, read_packet_handler,
442 req);
443 if (tevent_req_nomem(state->fde, req)) {
444 return tevent_req_post(req, ev);
446 return req;
449 static void read_packet_cleanup(struct tevent_req *req,
450 enum tevent_req_state req_state)
452 struct read_packet_state *state =
453 tevent_req_data(req, struct read_packet_state);
455 TALLOC_FREE(state->fde);
458 static void read_packet_handler(struct tevent_context *ev,
459 struct tevent_fd *fde,
460 uint16_t flags, void *private_data)
462 struct tevent_req *req = talloc_get_type_abort(
463 private_data, struct tevent_req);
464 struct read_packet_state *state =
465 tevent_req_data(req, struct read_packet_state);
466 size_t total = talloc_get_size(state->buf);
467 ssize_t nread, more;
468 uint8_t *tmp;
470 nread = recv(state->fd, state->buf+state->nread, total-state->nread,
472 if ((nread == -1) && (errno == ENOTSOCK)) {
473 nread = read(state->fd, state->buf+state->nread,
474 total-state->nread);
476 if ((nread == -1) && (errno == EINTR)) {
477 /* retry */
478 return;
480 if (nread == -1) {
481 tevent_req_error(req, errno);
482 return;
484 if (nread == 0) {
485 tevent_req_error(req, EPIPE);
486 return;
489 state->nread += nread;
490 if (state->nread < total) {
491 /* Come back later */
492 return;
496 * We got what was initially requested. See if "more" asks for -- more.
498 if (state->more == NULL) {
499 /* Nobody to ask, this is a async read_data */
500 tevent_req_done(req);
501 return;
504 more = state->more(state->buf, total, state->private_data);
505 if (more == -1) {
506 /* We got an invalid packet, tell the caller */
507 tevent_req_error(req, EIO);
508 return;
510 if (more == 0) {
511 /* We're done, full packet received */
512 tevent_req_done(req);
513 return;
516 if (total + more < total) {
517 tevent_req_error(req, EMSGSIZE);
518 return;
521 tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
522 if (tevent_req_nomem(tmp, req)) {
523 return;
525 state->buf = tmp;
528 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
529 uint8_t **pbuf, int *perrno)
531 struct read_packet_state *state =
532 tevent_req_data(req, struct read_packet_state);
534 if (tevent_req_is_unix_error(req, perrno)) {
535 tevent_req_received(req);
536 return -1;
538 *pbuf = talloc_move(mem_ctx, &state->buf);
539 tevent_req_received(req);
540 return talloc_get_size(*pbuf);
543 struct wait_for_read_state {
544 struct tevent_fd *fde;
545 int fd;
546 bool check_errors;
549 static void wait_for_read_cleanup(struct tevent_req *req,
550 enum tevent_req_state req_state);
551 static void wait_for_read_done(struct tevent_context *ev,
552 struct tevent_fd *fde,
553 uint16_t flags,
554 void *private_data);
556 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
557 struct tevent_context *ev, int fd,
558 bool check_errors)
560 struct tevent_req *req;
561 struct wait_for_read_state *state;
563 req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
564 if (req == NULL) {
565 return NULL;
568 tevent_req_set_cleanup_fn(req, wait_for_read_cleanup);
570 state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
571 wait_for_read_done, req);
572 if (tevent_req_nomem(state->fde, req)) {
573 return tevent_req_post(req, ev);
576 state->fd = fd;
577 state->check_errors = check_errors;
578 return req;
581 static void wait_for_read_cleanup(struct tevent_req *req,
582 enum tevent_req_state req_state)
584 struct wait_for_read_state *state =
585 tevent_req_data(req, struct wait_for_read_state);
587 TALLOC_FREE(state->fde);
590 static void wait_for_read_done(struct tevent_context *ev,
591 struct tevent_fd *fde,
592 uint16_t flags,
593 void *private_data)
595 struct tevent_req *req = talloc_get_type_abort(
596 private_data, struct tevent_req);
597 struct wait_for_read_state *state =
598 tevent_req_data(req, struct wait_for_read_state);
599 ssize_t nread;
600 char c;
602 if ((flags & TEVENT_FD_READ) == 0) {
603 return;
606 if (!state->check_errors) {
607 tevent_req_done(req);
608 return;
611 nread = recv(state->fd, &c, 1, MSG_PEEK);
613 if (nread == 0) {
614 tevent_req_error(req, EPIPE);
615 return;
618 if ((nread == -1) && (errno == EINTR)) {
619 /* come back later */
620 return;
623 if ((nread == -1) && (errno == ENOTSOCK)) {
624 /* Ignore this specific error on pipes */
625 tevent_req_done(req);
626 return;
629 if (nread == -1) {
630 tevent_req_error(req, errno);
631 return;
634 tevent_req_done(req);
637 bool wait_for_read_recv(struct tevent_req *req, int *perr)
639 int err = tevent_req_simple_recv_unix(req);
641 if (err != 0) {
642 *perr = err;
643 return false;
646 return true;