tdb: Add new function tdb_transaction_active()
[Samba.git] / lib / async_req / async_sock.c
blobdb3916e07e7a2f718d8ec0d9fdfd2f50fc4e890d
1 /*
2 Unix SMB/CIFS implementation.
3 async socket syscalls
4 Copyright (C) Volker Lendecke 2008
6 ** NOTE! The following LGPL license applies to the async_sock
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Library General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30 #include "lib/util/iov_buf.h"
32 /* Note: lib/util/ is currently GPL */
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/samba_util.h"
36 struct async_connect_state {
37 int fd;
38 struct tevent_fd *fde;
39 int result;
40 long old_sockflags;
41 socklen_t address_len;
42 struct sockaddr_storage address;
44 void (*before_connect)(void *private_data);
45 void (*after_connect)(void *private_data);
46 void *private_data;
49 static void async_connect_cleanup(struct tevent_req *req,
50 enum tevent_req_state req_state);
51 static void async_connect_connected(struct tevent_context *ev,
52 struct tevent_fd *fde, uint16_t flags,
53 void *priv);
55 /**
56 * @brief async version of connect(2)
57 * @param[in] mem_ctx The memory context to hang the result off
58 * @param[in] ev The event context to work from
59 * @param[in] fd The socket to recv from
60 * @param[in] address Where to connect?
61 * @param[in] address_len Length of *address
62 * @retval The async request
64 * This function sets the socket into non-blocking state to be able to call
65 * connect in an async state. This will be reset when the request is finished.
68 struct tevent_req *async_connect_send(
69 TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
70 const struct sockaddr *address, socklen_t address_len,
71 void (*before_connect)(void *private_data),
72 void (*after_connect)(void *private_data),
73 void *private_data)
75 struct tevent_req *req;
76 struct async_connect_state *state;
77 int ret;
79 req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
80 if (req == NULL) {
81 return NULL;
84 /**
85 * We have to set the socket to nonblocking for async connect(2). Keep
86 * the old sockflags around.
89 state->fd = fd;
90 state->before_connect = before_connect;
91 state->after_connect = after_connect;
92 state->private_data = private_data;
94 state->old_sockflags = fcntl(fd, F_GETFL, 0);
95 if (state->old_sockflags == -1) {
96 tevent_req_error(req, errno);
97 return tevent_req_post(req, ev);
100 tevent_req_set_cleanup_fn(req, async_connect_cleanup);
102 state->address_len = address_len;
103 if (address_len > sizeof(state->address)) {
104 tevent_req_error(req, EINVAL);
105 return tevent_req_post(req, ev);
107 memcpy(&state->address, address, address_len);
109 ret = set_blocking(fd, false);
110 if (ret == -1) {
111 tevent_req_error(req, errno);
112 return tevent_req_post(req, ev);
115 if (state->before_connect != NULL) {
116 state->before_connect(state->private_data);
119 state->result = connect(fd, address, address_len);
121 if (state->after_connect != NULL) {
122 state->after_connect(state->private_data);
125 if (state->result == 0) {
126 tevent_req_done(req);
127 return tevent_req_post(req, ev);
131 * The only errno indicating that an initial connect is still
132 * in flight is EINPROGRESS.
134 * We get EALREADY when someone calls us a second time for a
135 * given fd and the connect is still in flight (and returned
136 * EINPROGRESS the first time).
138 * This allows callers like open_socket_out_send() to reuse
139 * fds and call us with an fd for which the connect is still
140 * in flight. The proper thing to do for callers would be
141 * closing the fd and starting from scratch with a fresh
142 * socket.
145 if (errno != EINPROGRESS && errno != EALREADY) {
146 tevent_req_error(req, errno);
147 return tevent_req_post(req, ev);
150 state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE,
151 async_connect_connected, req);
152 if (state->fde == NULL) {
153 tevent_req_error(req, ENOMEM);
154 return tevent_req_post(req, ev);
156 return req;
159 static void async_connect_cleanup(struct tevent_req *req,
160 enum tevent_req_state req_state)
162 struct async_connect_state *state =
163 tevent_req_data(req, struct async_connect_state);
165 TALLOC_FREE(state->fde);
166 if (state->fd != -1) {
167 int ret;
169 ret = fcntl(state->fd, F_SETFL, state->old_sockflags);
170 if (ret == -1) {
171 abort();
174 state->fd = -1;
179 * fde event handler for connect(2)
180 * @param[in] ev The event context that sent us here
181 * @param[in] fde The file descriptor event associated with the connect
182 * @param[in] flags Indicate read/writeability of the socket
183 * @param[in] priv private data, "struct async_req *" in this case
186 static void async_connect_connected(struct tevent_context *ev,
187 struct tevent_fd *fde, uint16_t flags,
188 void *priv)
190 struct tevent_req *req = talloc_get_type_abort(
191 priv, struct tevent_req);
192 struct async_connect_state *state =
193 tevent_req_data(req, struct async_connect_state);
194 int ret;
195 int socket_error = 0;
196 socklen_t slen = sizeof(socket_error);
198 ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
199 &socket_error, &slen);
201 if (ret != 0) {
203 * According to Stevens this is the Solaris behaviour
204 * in case the connection encountered an error:
205 * getsockopt() fails, error is in errno
207 tevent_req_error(req, errno);
208 return;
211 if (socket_error != 0) {
213 * Berkeley derived implementations (including) Linux
214 * return the pending error via socket_error.
216 tevent_req_error(req, socket_error);
217 return;
220 tevent_req_done(req);
221 return;
224 int async_connect_recv(struct tevent_req *req, int *perrno)
226 int err = tevent_req_simple_recv_unix(req);
228 if (err != 0) {
229 *perrno = err;
230 return -1;
233 return 0;
236 struct writev_state {
237 struct tevent_context *ev;
238 struct tevent_queue_entry *queue_entry;
239 int fd;
240 struct tevent_fd *fde;
241 struct iovec *iov;
242 int count;
243 size_t total_size;
244 uint16_t flags;
245 bool err_on_readability;
248 static void writev_cleanup(struct tevent_req *req,
249 enum tevent_req_state req_state);
250 static bool writev_cancel(struct tevent_req *req);
251 static void writev_trigger(struct tevent_req *req, void *private_data);
252 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
253 uint16_t flags, void *private_data);
255 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
256 struct tevent_queue *queue, int fd,
257 bool err_on_readability,
258 struct iovec *iov, int count)
260 struct tevent_req *req;
261 struct writev_state *state;
263 req = tevent_req_create(mem_ctx, &state, struct writev_state);
264 if (req == NULL) {
265 return NULL;
267 state->ev = ev;
268 state->fd = fd;
269 state->total_size = 0;
270 state->count = count;
271 state->iov = (struct iovec *)talloc_memdup(
272 state, iov, sizeof(struct iovec) * count);
273 if (tevent_req_nomem(state->iov, req)) {
274 return tevent_req_post(req, ev);
276 state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
277 state->err_on_readability = err_on_readability;
279 tevent_req_set_cleanup_fn(req, writev_cleanup);
280 tevent_req_set_cancel_fn(req, writev_cancel);
282 if (queue == NULL) {
283 state->fde = tevent_add_fd(state->ev, state, state->fd,
284 state->flags, writev_handler, req);
285 if (tevent_req_nomem(state->fde, req)) {
286 return tevent_req_post(req, ev);
288 return req;
291 state->queue_entry = tevent_queue_add_entry(queue, ev, req,
292 writev_trigger, NULL);
293 if (tevent_req_nomem(state->queue_entry, req)) {
294 return tevent_req_post(req, ev);
296 return req;
299 static void writev_cleanup(struct tevent_req *req,
300 enum tevent_req_state req_state)
302 struct writev_state *state = tevent_req_data(req, struct writev_state);
304 TALLOC_FREE(state->queue_entry);
305 TALLOC_FREE(state->fde);
308 static bool writev_cancel(struct tevent_req *req)
310 struct writev_state *state = tevent_req_data(req, struct writev_state);
312 TALLOC_FREE(state->queue_entry);
313 TALLOC_FREE(state->fde);
315 if (state->count == 0) {
317 * already completed.
319 return false;
322 tevent_req_defer_callback(req, state->ev);
323 if (state->total_size > 0) {
325 * We've already started to write :-(
327 tevent_req_error(req, EIO);
328 return false;
331 tevent_req_error(req, ECANCELED);
332 return true;
335 static void writev_trigger(struct tevent_req *req, void *private_data)
337 struct writev_state *state = tevent_req_data(req, struct writev_state);
339 state->queue_entry = NULL;
341 state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
342 writev_handler, req);
343 if (tevent_req_nomem(state->fde, req)) {
344 return;
348 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
349 uint16_t flags, void *private_data)
351 struct tevent_req *req = talloc_get_type_abort(
352 private_data, struct tevent_req);
353 struct writev_state *state =
354 tevent_req_data(req, struct writev_state);
355 ssize_t written;
356 bool ok;
358 if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
359 int ret, value;
361 if (state->err_on_readability) {
362 /* Readable and the caller wants an error on read. */
363 tevent_req_error(req, EPIPE);
364 return;
367 /* Might be an error. Check if there are bytes to read */
368 ret = ioctl(state->fd, FIONREAD, &value);
369 /* FIXME - should we also check
370 for ret == 0 and value == 0 here ? */
371 if (ret == -1) {
372 /* There's an error. */
373 tevent_req_error(req, EPIPE);
374 return;
376 /* A request for TEVENT_FD_READ will succeed from now and
377 forevermore until the bytes are read so if there was
378 an error we'll wait until we do read, then get it in
379 the read callback function. Until then, remove TEVENT_FD_READ
380 from the flags we're waiting for. */
381 state->flags &= ~TEVENT_FD_READ;
382 TEVENT_FD_NOT_READABLE(fde);
384 /* If not writable, we're done. */
385 if (!(flags & TEVENT_FD_WRITE)) {
386 return;
390 written = writev(state->fd, state->iov, state->count);
391 if ((written == -1) && (errno == EINTR)) {
392 /* retry */
393 return;
395 if (written == -1) {
396 tevent_req_error(req, errno);
397 return;
399 if (written == 0) {
400 tevent_req_error(req, EPIPE);
401 return;
403 state->total_size += written;
405 ok = iov_advance(&state->iov, &state->count, written);
406 if (!ok) {
407 tevent_req_error(req, EIO);
408 return;
411 if (state->count == 0) {
412 tevent_req_done(req);
413 return;
417 ssize_t writev_recv(struct tevent_req *req, int *perrno)
419 struct writev_state *state =
420 tevent_req_data(req, struct writev_state);
421 ssize_t ret;
423 if (tevent_req_is_unix_error(req, perrno)) {
424 tevent_req_received(req);
425 return -1;
427 ret = state->total_size;
428 tevent_req_received(req);
429 return ret;
432 struct read_packet_state {
433 int fd;
434 struct tevent_fd *fde;
435 uint8_t *buf;
436 size_t nread;
437 ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
438 void *private_data;
441 static void read_packet_cleanup(struct tevent_req *req,
442 enum tevent_req_state req_state);
443 static void read_packet_handler(struct tevent_context *ev,
444 struct tevent_fd *fde,
445 uint16_t flags, void *private_data);
447 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
448 struct tevent_context *ev,
449 int fd, size_t initial,
450 ssize_t (*more)(uint8_t *buf,
451 size_t buflen,
452 void *private_data),
453 void *private_data)
455 struct tevent_req *req;
456 struct read_packet_state *state;
458 req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
459 if (req == NULL) {
460 return NULL;
462 state->fd = fd;
463 state->nread = 0;
464 state->more = more;
465 state->private_data = private_data;
467 tevent_req_set_cleanup_fn(req, read_packet_cleanup);
469 state->buf = talloc_array(state, uint8_t, initial);
470 if (tevent_req_nomem(state->buf, req)) {
471 return tevent_req_post(req, ev);
474 state->fde = tevent_add_fd(ev, state, fd,
475 TEVENT_FD_READ, read_packet_handler,
476 req);
477 if (tevent_req_nomem(state->fde, req)) {
478 return tevent_req_post(req, ev);
480 return req;
483 static void read_packet_cleanup(struct tevent_req *req,
484 enum tevent_req_state req_state)
486 struct read_packet_state *state =
487 tevent_req_data(req, struct read_packet_state);
489 TALLOC_FREE(state->fde);
492 static void read_packet_handler(struct tevent_context *ev,
493 struct tevent_fd *fde,
494 uint16_t flags, void *private_data)
496 struct tevent_req *req = talloc_get_type_abort(
497 private_data, struct tevent_req);
498 struct read_packet_state *state =
499 tevent_req_data(req, struct read_packet_state);
500 size_t total = talloc_get_size(state->buf);
501 ssize_t nread, more;
502 uint8_t *tmp;
504 nread = recv(state->fd, state->buf+state->nread, total-state->nread,
506 if ((nread == -1) && (errno == ENOTSOCK)) {
507 nread = read(state->fd, state->buf+state->nread,
508 total-state->nread);
510 if ((nread == -1) && (errno == EINTR)) {
511 /* retry */
512 return;
514 if (nread == -1) {
515 tevent_req_error(req, errno);
516 return;
518 if (nread == 0) {
519 tevent_req_error(req, EPIPE);
520 return;
523 state->nread += nread;
524 if (state->nread < total) {
525 /* Come back later */
526 return;
530 * We got what was initially requested. See if "more" asks for -- more.
532 if (state->more == NULL) {
533 /* Nobody to ask, this is a async read_data */
534 tevent_req_done(req);
535 return;
538 more = state->more(state->buf, total, state->private_data);
539 if (more == -1) {
540 /* We got an invalid packet, tell the caller */
541 tevent_req_error(req, EIO);
542 return;
544 if (more == 0) {
545 /* We're done, full packet received */
546 tevent_req_done(req);
547 return;
550 if (total + more < total) {
551 tevent_req_error(req, EMSGSIZE);
552 return;
555 tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
556 if (tevent_req_nomem(tmp, req)) {
557 return;
559 state->buf = tmp;
562 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
563 uint8_t **pbuf, int *perrno)
565 struct read_packet_state *state =
566 tevent_req_data(req, struct read_packet_state);
568 if (tevent_req_is_unix_error(req, perrno)) {
569 tevent_req_received(req);
570 return -1;
572 *pbuf = talloc_move(mem_ctx, &state->buf);
573 tevent_req_received(req);
574 return talloc_get_size(*pbuf);
577 struct wait_for_read_state {
578 struct tevent_fd *fde;
579 int fd;
580 bool check_errors;
583 static void wait_for_read_cleanup(struct tevent_req *req,
584 enum tevent_req_state req_state);
585 static void wait_for_read_done(struct tevent_context *ev,
586 struct tevent_fd *fde,
587 uint16_t flags,
588 void *private_data);
590 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
591 struct tevent_context *ev, int fd,
592 bool check_errors)
594 struct tevent_req *req;
595 struct wait_for_read_state *state;
597 req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
598 if (req == NULL) {
599 return NULL;
602 tevent_req_set_cleanup_fn(req, wait_for_read_cleanup);
604 state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
605 wait_for_read_done, req);
606 if (tevent_req_nomem(state->fde, req)) {
607 return tevent_req_post(req, ev);
610 state->fd = fd;
611 state->check_errors = check_errors;
612 return req;
615 static void wait_for_read_cleanup(struct tevent_req *req,
616 enum tevent_req_state req_state)
618 struct wait_for_read_state *state =
619 tevent_req_data(req, struct wait_for_read_state);
621 TALLOC_FREE(state->fde);
624 static void wait_for_read_done(struct tevent_context *ev,
625 struct tevent_fd *fde,
626 uint16_t flags,
627 void *private_data)
629 struct tevent_req *req = talloc_get_type_abort(
630 private_data, struct tevent_req);
631 struct wait_for_read_state *state =
632 tevent_req_data(req, struct wait_for_read_state);
633 ssize_t nread;
634 char c;
636 if ((flags & TEVENT_FD_READ) == 0) {
637 return;
640 if (!state->check_errors) {
641 tevent_req_done(req);
642 return;
645 nread = recv(state->fd, &c, 1, MSG_PEEK);
647 if (nread == 0) {
648 tevent_req_error(req, EPIPE);
649 return;
652 if ((nread == -1) && (errno == EINTR)) {
653 /* come back later */
654 return;
657 if ((nread == -1) && (errno == ENOTSOCK)) {
658 /* Ignore this specific error on pipes */
659 tevent_req_done(req);
660 return;
663 if (nread == -1) {
664 tevent_req_error(req, errno);
665 return;
668 tevent_req_done(req);
671 bool wait_for_read_recv(struct tevent_req *req, int *perr)
673 int err = tevent_req_simple_recv_unix(req);
675 if (err != 0) {
676 *perr = err;
677 return false;
680 return true;
683 struct accept_state {
684 struct tevent_fd *fde;
685 int listen_sock;
686 socklen_t addrlen;
687 struct sockaddr_storage addr;
688 int sock;
691 static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
692 uint16_t flags, void *private_data);
694 struct tevent_req *accept_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
695 int listen_sock)
697 struct tevent_req *req;
698 struct accept_state *state;
700 req = tevent_req_create(mem_ctx, &state, struct accept_state);
701 if (req == NULL) {
702 return NULL;
705 state->listen_sock = listen_sock;
707 state->fde = tevent_add_fd(ev, state, listen_sock, TEVENT_FD_READ,
708 accept_handler, req);
709 if (tevent_req_nomem(state->fde, req)) {
710 return tevent_req_post(req, ev);
712 return req;
715 static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
716 uint16_t flags, void *private_data)
718 struct tevent_req *req = talloc_get_type_abort(
719 private_data, struct tevent_req);
720 struct accept_state *state = tevent_req_data(req, struct accept_state);
721 int ret;
723 TALLOC_FREE(state->fde);
725 if ((flags & TEVENT_FD_READ) == 0) {
726 tevent_req_error(req, EIO);
727 return;
729 state->addrlen = sizeof(state->addr);
731 ret = accept(state->listen_sock, (struct sockaddr *)&state->addr,
732 &state->addrlen);
733 if ((ret == -1) && (errno == EINTR)) {
734 /* retry */
735 return;
737 if (ret == -1) {
738 tevent_req_error(req, errno);
739 return;
741 state->sock = ret;
742 tevent_req_done(req);
745 int accept_recv(struct tevent_req *req, struct sockaddr_storage *paddr,
746 socklen_t *paddrlen, int *perr)
748 struct accept_state *state = tevent_req_data(req, struct accept_state);
749 int err;
751 if (tevent_req_is_unix_error(req, &err)) {
752 if (perr != NULL) {
753 *perr = err;
755 return -1;
757 if (paddr != NULL) {
758 memcpy(paddr, &state->addr, state->addrlen);
760 if (paddrlen != NULL) {
761 *paddrlen = state->addrlen;
763 return state->sock;