Add more conventional async_recv
[Samba/gebeck_regimport.git] / lib / async_req / async_sock.c
blob3563421e0e56c62363ab752e55db3ea788db2136
1 /*
2 Unix SMB/CIFS implementation.
3 async socket syscalls
4 Copyright (C) Volker Lendecke 2008
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "lib/talloc/talloc.h"
22 #include "lib/tevent/tevent.h"
23 #include "lib/async_req/async_req.h"
24 #include "lib/async_req/async_sock.h"
25 #include "lib/util/tevent_unix.h"
26 #include <fcntl.h>
28 #ifndef TALLOC_FREE
29 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
30 #endif
32 /**
33 * Discriminator for async_syscall_state
35 enum async_syscall_type {
36 ASYNC_SYSCALL_SEND,
37 ASYNC_SYSCALL_RECV,
40 /**
41 * Holder for syscall arguments and the result
44 struct async_syscall_state {
45 enum async_syscall_type syscall_type;
46 struct tevent_fd *fde;
48 union {
49 struct param_send {
50 int fd;
51 const void *buffer;
52 size_t length;
53 int flags;
54 } param_send;
55 struct param_recv {
56 int fd;
57 void *buffer;
58 size_t length;
59 int flags;
60 } param_recv;
61 } param;
63 union {
64 ssize_t result_ssize_t;
65 size_t result_size_t;
66 int result_int;
67 } result;
68 int sys_errno;
71 /**
72 * @brief Map async_req states to unix-style errnos
73 * @param[in] req The async req to get the state from
74 * @param[out] err Pointer to take the unix-style errno
76 * @return true if the async_req is in an error state, false otherwise
79 bool async_req_is_errno(struct async_req *req, int *err)
81 enum async_req_state state;
82 uint64_t error;
84 if (!async_req_is_error(req, &state, &error)) {
85 return false;
88 switch (state) {
89 case ASYNC_REQ_USER_ERROR:
90 *err = (int)error;
91 break;
92 case ASYNC_REQ_TIMED_OUT:
93 #ifdef ETIMEDOUT
94 *err = ETIMEDOUT;
95 #else
96 *err = EAGAIN;
97 #endif
98 break;
99 case ASYNC_REQ_NO_MEMORY:
100 *err = ENOMEM;
101 break;
102 default:
103 *err = EIO;
104 break;
106 return true;
109 int async_req_simple_recv_errno(struct async_req *req)
111 int err;
113 if (async_req_is_errno(req, &err)) {
114 return err;
117 return 0;
121 * @brief Create a new async syscall req
122 * @param[in] mem_ctx The memory context to hang the result off
123 * @param[in] ev The event context to work from
124 * @param[in] type Which syscall will this be
125 * @param[in] pstate Where to put the newly created private_data state
126 * @retval The new request
128 * This is a helper function to prepare a new struct async_req with an
129 * associated struct async_syscall_state. The async_syscall_state will be put
130 * into the async_req as private_data.
133 static struct async_req *async_syscall_new(TALLOC_CTX *mem_ctx,
134 struct tevent_context *ev,
135 enum async_syscall_type type,
136 struct async_syscall_state **pstate)
138 struct async_req *result;
139 struct async_syscall_state *state;
141 if (!async_req_setup(mem_ctx, &result, &state,
142 struct async_syscall_state)) {
143 return NULL;
145 state->syscall_type = type;
147 result->private_data = state;
149 *pstate = state;
151 return result;
155 * @brief Create a new async syscall req based on a fd
156 * @param[in] mem_ctx The memory context to hang the result off
157 * @param[in] ev The event context to work from
158 * @param[in] type Which syscall will this be
159 * @param[in] fd The file descriptor we work on
160 * @param[in] fde_flags TEVENT_FD_READ/WRITE -- what are we interested in?
161 * @param[in] fde_cb The callback function for the file descriptor event
162 * @param[in] pstate Where to put the newly created private_data state
163 * @retval The new request
165 * This is a helper function to prepare a new struct async_req with an
166 * associated struct async_syscall_state and an associated file descriptor
167 * event.
170 static struct async_req *async_fde_syscall_new(
171 TALLOC_CTX *mem_ctx,
172 struct tevent_context *ev,
173 enum async_syscall_type type,
174 int fd,
175 uint16_t fde_flags,
176 void (*fde_cb)(struct tevent_context *ev,
177 struct tevent_fd *fde, uint16_t flags,
178 void *priv),
179 struct async_syscall_state **pstate)
181 struct async_req *result;
182 struct async_syscall_state *state;
184 result = async_syscall_new(mem_ctx, ev, type, &state);
185 if (result == NULL) {
186 return NULL;
189 state->fde = tevent_add_fd(ev, state, fd, fde_flags, fde_cb, result);
190 if (state->fde == NULL) {
191 TALLOC_FREE(result);
192 return NULL;
194 *pstate = state;
195 return result;
199 * Retrieve a ssize_t typed result from an async syscall
200 * @param[in] req The syscall that has just finished
201 * @param[out] perrno Where to put the syscall's errno
202 * @retval The return value from the asynchronously called syscall
205 ssize_t async_syscall_result_ssize_t(struct async_req *req, int *perrno)
207 struct async_syscall_state *state = talloc_get_type_abort(
208 req->private_data, struct async_syscall_state);
210 *perrno = state->sys_errno;
211 return state->result.result_ssize_t;
215 * Retrieve a size_t typed result from an async syscall
216 * @param[in] req The syscall that has just finished
217 * @param[out] perrno Where to put the syscall's errno
218 * @retval The return value from the asynchronously called syscall
221 size_t async_syscall_result_size_t(struct async_req *req, int *perrno)
223 struct async_syscall_state *state = talloc_get_type_abort(
224 req->private_data, struct async_syscall_state);
226 *perrno = state->sys_errno;
227 return state->result.result_size_t;
231 * Retrieve a int typed result from an async syscall
232 * @param[in] req The syscall that has just finished
233 * @param[out] perrno Where to put the syscall's errno
234 * @retval The return value from the asynchronously called syscall
237 int async_syscall_result_int(struct async_req *req, int *perrno)
239 struct async_syscall_state *state = talloc_get_type_abort(
240 req->private_data, struct async_syscall_state);
242 *perrno = state->sys_errno;
243 return state->result.result_int;
247 * fde event handler for the "send" syscall
248 * @param[in] ev The event context that sent us here
249 * @param[in] fde The file descriptor event associated with the send
250 * @param[in] flags Can only be TEVENT_FD_WRITE here
251 * @param[in] priv private data, "struct async_req *" in this case
254 static void async_send_callback(struct tevent_context *ev,
255 struct tevent_fd *fde, uint16_t flags,
256 void *priv)
258 struct async_req *req = talloc_get_type_abort(
259 priv, struct async_req);
260 struct async_syscall_state *state = talloc_get_type_abort(
261 req->private_data, struct async_syscall_state);
262 struct param_send *p = &state->param.param_send;
264 if (state->syscall_type != ASYNC_SYSCALL_SEND) {
265 async_req_error(req, EIO);
266 return;
269 state->result.result_ssize_t = send(p->fd, p->buffer, p->length,
270 p->flags);
271 state->sys_errno = errno;
273 TALLOC_FREE(state->fde);
275 async_req_done(req);
279 * Async version of send(2)
280 * @param[in] mem_ctx The memory context to hang the result off
281 * @param[in] ev The event context to work from
282 * @param[in] fd The socket to send to
283 * @param[in] buffer The buffer to send
284 * @param[in] length How many bytes to send
285 * @param[in] flags flags passed to send(2)
287 * This function is a direct counterpart of send(2)
290 struct async_req *async_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
291 int fd, const void *buffer, size_t length,
292 int flags)
294 struct async_req *result;
295 struct async_syscall_state *state;
297 result = async_fde_syscall_new(
298 mem_ctx, ev, ASYNC_SYSCALL_SEND,
299 fd, TEVENT_FD_WRITE, async_send_callback,
300 &state);
301 if (result == NULL) {
302 return NULL;
305 state->param.param_send.fd = fd;
306 state->param.param_send.buffer = buffer;
307 state->param.param_send.length = length;
308 state->param.param_send.flags = flags;
310 return result;
314 * fde event handler for the "recv" syscall
315 * @param[in] ev The event context that sent us here
316 * @param[in] fde The file descriptor event associated with the recv
317 * @param[in] flags Can only be TEVENT_FD_READ here
318 * @param[in] priv private data, "struct async_req *" in this case
321 static void async_recv_callback(struct tevent_context *ev,
322 struct tevent_fd *fde, uint16_t flags,
323 void *priv)
325 struct async_req *req = talloc_get_type_abort(
326 priv, struct async_req);
327 struct async_syscall_state *state = talloc_get_type_abort(
328 req->private_data, struct async_syscall_state);
329 struct param_recv *p = &state->param.param_recv;
331 if (state->syscall_type != ASYNC_SYSCALL_RECV) {
332 async_req_error(req, EIO);
333 return;
336 state->result.result_ssize_t = recv(p->fd, p->buffer, p->length,
337 p->flags);
338 state->sys_errno = errno;
340 TALLOC_FREE(state->fde);
342 async_req_done(req);
346 * Async version of recv(2)
347 * @param[in] mem_ctx The memory context to hang the result off
348 * @param[in] ev The event context to work from
349 * @param[in] fd The socket to recv from
350 * @param[in] buffer The buffer to recv into
351 * @param[in] length How many bytes to recv
352 * @param[in] flags flags passed to recv(2)
354 * This function is a direct counterpart of recv(2)
357 struct async_req *async_recv(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
358 int fd, void *buffer, size_t length,
359 int flags)
361 struct async_req *result;
362 struct async_syscall_state *state;
364 result = async_fde_syscall_new(
365 mem_ctx, ev, ASYNC_SYSCALL_RECV,
366 fd, TEVENT_FD_READ, async_recv_callback,
367 &state);
369 if (result == NULL) {
370 return NULL;
373 state->param.param_recv.fd = fd;
374 state->param.param_recv.buffer = buffer;
375 state->param.param_recv.length = length;
376 state->param.param_recv.flags = flags;
378 return result;
381 struct async_send_state {
382 int fd;
383 const void *buf;
384 size_t len;
385 int flags;
386 ssize_t sent;
389 static void async_send_handler(struct tevent_context *ev,
390 struct tevent_fd *fde,
391 uint16_t flags, void *private_data);
393 struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
394 struct tevent_context *ev,
395 int fd, const void *buf, size_t len,
396 int flags)
398 struct tevent_req *result;
399 struct async_send_state *state;
400 struct tevent_fd *fde;
402 result = tevent_req_create(mem_ctx, &state, struct async_send_state);
403 if (result == NULL) {
404 return result;
406 state->fd = fd;
407 state->buf = buf;
408 state->len = len;
409 state->flags = flags;
411 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
412 result);
413 if (fde == NULL) {
414 TALLOC_FREE(result);
415 return NULL;
417 return result;
420 static void async_send_handler(struct tevent_context *ev,
421 struct tevent_fd *fde,
422 uint16_t flags, void *private_data)
424 struct tevent_req *req = talloc_get_type_abort(
425 private_data, struct tevent_req);
426 struct async_send_state *state = talloc_get_type_abort(
427 req->private_state, struct async_send_state);
429 state->sent = send(state->fd, state->buf, state->len, state->flags);
430 if (state->sent == -1) {
431 tevent_req_error(req, errno);
432 return;
434 tevent_req_done(req);
437 ssize_t async_send_recv(struct tevent_req *req, int *perrno)
439 struct async_send_state *state = talloc_get_type_abort(
440 req->private_state, struct async_send_state);
442 if (tevent_req_is_unix_error(req, perrno)) {
443 return -1;
445 return state->sent;
448 struct async_recv_state {
449 int fd;
450 void *buf;
451 size_t len;
452 int flags;
453 ssize_t received;
456 static void async_recv_handler(struct tevent_context *ev,
457 struct tevent_fd *fde,
458 uint16_t flags, void *private_data);
460 struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
461 struct tevent_context *ev,
462 int fd, void *buf, size_t len, int flags)
464 struct tevent_req *result;
465 struct async_recv_state *state;
466 struct tevent_fd *fde;
468 result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
469 if (result == NULL) {
470 return result;
472 state->fd = fd;
473 state->buf = buf;
474 state->len = len;
475 state->flags = flags;
477 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
478 result);
479 if (fde == NULL) {
480 TALLOC_FREE(result);
481 return NULL;
483 return result;
486 static void async_recv_handler(struct tevent_context *ev,
487 struct tevent_fd *fde,
488 uint16_t flags, void *private_data)
490 struct tevent_req *req = talloc_get_type_abort(
491 private_data, struct tevent_req);
492 struct async_recv_state *state = talloc_get_type_abort(
493 req->private_state, struct async_recv_state);
495 state->received = recv(state->fd, state->buf, state->len,
496 state->flags);
497 if (state->received == -1) {
498 tevent_req_error(req, errno);
499 return;
501 tevent_req_done(req);
504 ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
506 struct async_recv_state *state = talloc_get_type_abort(
507 req->private_state, struct async_recv_state);
509 if (tevent_req_is_unix_error(req, perrno)) {
510 return -1;
512 return state->received;
515 struct async_connect_state {
516 int fd;
517 int result;
518 int sys_errno;
519 long old_sockflags;
522 static void async_connect_connected(struct tevent_context *ev,
523 struct tevent_fd *fde, uint16_t flags,
524 void *priv);
527 * @brief async version of connect(2)
528 * @param[in] mem_ctx The memory context to hang the result off
529 * @param[in] ev The event context to work from
530 * @param[in] fd The socket to recv from
531 * @param[in] address Where to connect?
532 * @param[in] address_len Length of *address
533 * @retval The async request
535 * This function sets the socket into non-blocking state to be able to call
536 * connect in an async state. This will be reset when the request is finished.
539 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
540 struct tevent_context *ev,
541 int fd, const struct sockaddr *address,
542 socklen_t address_len)
544 struct tevent_req *result;
545 struct async_connect_state *state;
546 struct tevent_fd *fde;
548 result = tevent_req_create(
549 mem_ctx, &state, struct async_connect_state);
550 if (result == NULL) {
551 return NULL;
555 * We have to set the socket to nonblocking for async connect(2). Keep
556 * the old sockflags around.
559 state->fd = fd;
560 state->sys_errno = 0;
562 state->old_sockflags = fcntl(fd, F_GETFL, 0);
563 if (state->old_sockflags == -1) {
564 goto post_errno;
567 set_blocking(fd, false);
569 state->result = connect(fd, address, address_len);
570 if (state->result == 0) {
571 errno = 0;
572 goto post_errno;
576 * A number of error messages show that something good is progressing
577 * and that we have to wait for readability.
579 * If none of them are present, bail out.
582 if (!(errno == EINPROGRESS || errno == EALREADY ||
583 #ifdef EISCONN
584 errno == EISCONN ||
585 #endif
586 errno == EAGAIN || errno == EINTR)) {
587 goto post_errno;
590 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
591 async_connect_connected, result);
592 if (fde == NULL) {
593 errno = ENOMEM;
594 goto post_errno;
596 return result;
598 post_errno:
599 state->sys_errno = errno;
600 fcntl(fd, F_SETFL, state->old_sockflags);
601 if (state->sys_errno == 0) {
602 tevent_req_done(result);
603 } else {
604 tevent_req_error(result, state->sys_errno);
606 return tevent_req_post(result, ev);
610 * fde event handler for connect(2)
611 * @param[in] ev The event context that sent us here
612 * @param[in] fde The file descriptor event associated with the connect
613 * @param[in] flags Indicate read/writeability of the socket
614 * @param[in] priv private data, "struct async_req *" in this case
617 static void async_connect_connected(struct tevent_context *ev,
618 struct tevent_fd *fde, uint16_t flags,
619 void *priv)
621 struct tevent_req *req = talloc_get_type_abort(
622 priv, struct tevent_req);
623 struct async_connect_state *state = talloc_get_type_abort(
624 req->private_state, struct async_connect_state);
626 TALLOC_FREE(fde);
629 * Stevens, Network Programming says that if there's a
630 * successful connect, the socket is only writable. Upon an
631 * error, it's both readable and writable.
633 if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
634 == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
635 int sockerr;
636 socklen_t err_len = sizeof(sockerr);
638 if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
639 (void *)&sockerr, &err_len) == 0) {
640 errno = sockerr;
643 state->sys_errno = errno;
645 DEBUG(10, ("connect returned %s\n", strerror(errno)));
647 fcntl(state->fd, F_SETFL, state->old_sockflags);
648 tevent_req_error(req, state->sys_errno);
649 return;
652 state->sys_errno = 0;
653 tevent_req_done(req);
656 int async_connect_recv(struct tevent_req *req, int *perrno)
658 struct async_connect_state *state = talloc_get_type_abort(
659 req->private_state, struct async_connect_state);
660 int err;
662 fcntl(state->fd, F_SETFL, state->old_sockflags);
664 if (tevent_req_is_unix_error(req, &err)) {
665 *perrno = err;
666 return -1;
669 if (state->sys_errno == 0) {
670 return 0;
673 *perrno = state->sys_errno;
674 return -1;
677 struct writev_state {
678 struct tevent_context *ev;
679 int fd;
680 struct iovec *iov;
681 int count;
682 size_t total_size;
685 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
686 uint16_t flags, void *private_data);
688 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
689 int fd, struct iovec *iov, int count)
691 struct tevent_req *result;
692 struct writev_state *state;
693 struct tevent_fd *fde;
695 result = tevent_req_create(mem_ctx, &state, struct writev_state);
696 if (result == NULL) {
697 return NULL;
699 state->ev = ev;
700 state->fd = fd;
701 state->total_size = 0;
702 state->count = count;
703 state->iov = (struct iovec *)talloc_memdup(
704 state, iov, sizeof(struct iovec) * count);
705 if (state->iov == NULL) {
706 goto fail;
709 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, writev_handler,
710 result);
711 if (fde == NULL) {
712 goto fail;
714 return result;
716 fail:
717 TALLOC_FREE(result);
718 return NULL;
721 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
722 uint16_t flags, void *private_data)
724 struct tevent_req *req = talloc_get_type_abort(
725 private_data, struct tevent_req);
726 struct writev_state *state = talloc_get_type_abort(
727 req->private_state, struct writev_state);
728 size_t to_write, written;
729 int i;
731 to_write = 0;
733 for (i=0; i<state->count; i++) {
734 to_write += state->iov[i].iov_len;
737 written = sys_writev(state->fd, state->iov, state->count);
738 if (written == -1) {
739 tevent_req_error(req, errno);
740 return;
742 if (written == 0) {
743 tevent_req_error(req, EPIPE);
744 return;
746 state->total_size += written;
748 if (written == to_write) {
749 tevent_req_done(req);
750 return;
754 * We've written less than we were asked to, drop stuff from
755 * state->iov.
758 while (written > 0) {
759 if (written < state->iov[0].iov_len) {
760 state->iov[0].iov_base =
761 (char *)state->iov[0].iov_base + written;
762 state->iov[0].iov_len -= written;
763 break;
765 written = state->iov[0].iov_len;
766 state->iov += 1;
767 state->count -= 1;
771 ssize_t writev_recv(struct tevent_req *req, int *perrno)
773 struct writev_state *state = talloc_get_type_abort(
774 req->private_state, struct writev_state);
776 if (tevent_req_is_unix_error(req, perrno)) {
777 return -1;
779 return state->total_size;
782 struct read_packet_state {
783 int fd;
784 uint8_t *buf;
785 size_t nread;
786 ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
787 void *private_data;
790 static void read_packet_handler(struct tevent_context *ev,
791 struct tevent_fd *fde,
792 uint16_t flags, void *private_data);
794 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
795 struct tevent_context *ev,
796 int fd, size_t initial,
797 ssize_t (*more)(uint8_t *buf,
798 size_t buflen,
799 void *private_data),
800 void *private_data)
802 struct tevent_req *result;
803 struct read_packet_state *state;
804 struct tevent_fd *fde;
806 result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
807 if (result == NULL) {
808 return NULL;
810 state->fd = fd;
811 state->nread = 0;
812 state->more = more;
813 state->private_data = private_data;
815 state->buf = talloc_array(state, uint8_t, initial);
816 if (state->buf == NULL) {
817 goto fail;
820 fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
821 result);
822 if (fde == NULL) {
823 goto fail;
825 return result;
826 fail:
827 TALLOC_FREE(result);
828 return NULL;
831 static void read_packet_handler(struct tevent_context *ev,
832 struct tevent_fd *fde,
833 uint16_t flags, void *private_data)
835 struct tevent_req *req = talloc_get_type_abort(
836 private_data, struct tevent_req);
837 struct read_packet_state *state = talloc_get_type_abort(
838 req->private_state, struct read_packet_state);
839 size_t total = talloc_get_size(state->buf);
840 ssize_t nread, more;
841 uint8_t *tmp;
843 nread = read(state->fd, state->buf+state->nread, total-state->nread);
844 if (nread == -1) {
845 tevent_req_error(req, errno);
846 return;
848 if (nread == 0) {
849 tevent_req_error(req, EPIPE);
850 return;
853 state->nread += nread;
854 if (state->nread < total) {
855 /* Come back later */
856 return;
860 * We got what was initially requested. See if "more" asks for -- more.
862 if (state->more == NULL) {
863 /* Nobody to ask, this is a async read_data */
864 tevent_req_done(req);
865 return;
868 more = state->more(state->buf, total, state->private_data);
869 if (more == -1) {
870 /* We got an invalid packet, tell the caller */
871 tevent_req_error(req, EIO);
872 return;
874 if (more == 0) {
875 /* We're done, full packet received */
876 tevent_req_done(req);
877 return;
880 tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
881 if (tevent_req_nomem(tmp, req)) {
882 return;
884 state->buf = tmp;
887 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
888 uint8_t **pbuf, int *perrno)
890 struct read_packet_state *state = talloc_get_type_abort(
891 req->private_state, struct read_packet_state);
893 if (tevent_req_is_unix_error(req, perrno)) {
894 return -1;
896 *pbuf = talloc_move(mem_ctx, &state->buf);
897 return talloc_get_size(*pbuf);