CVE-2023-4091: smbd: use open_access_mask for access check in open_file()
[Samba.git] / source3 / lib / ctdbd_conn.c
blobdd9206b00fd9c37f69ff6c566be95a418aef482c
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "replace.h"
22 #include <tevent.h>
23 #include "util_tdb.h"
24 #include "serverid.h"
25 #include "ctdbd_conn.h"
26 #include "system/select.h"
27 #include "lib/util/util_net.h"
28 #include "lib/util/sys_rw_data.h"
29 #include "lib/util/iov_buf.h"
30 #include "lib/util/select.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/talloc_stack.h"
33 #include "lib/util/genrand.h"
34 #include "lib/util/fault.h"
35 #include "lib/util/dlinklist.h"
36 #include "lib/util/tevent_unix.h"
37 #include "lib/util/sys_rw.h"
38 #include "lib/util/blocking.h"
39 #include "ctdb/include/ctdb_protocol.h"
40 #include "lib/async_req/async_sock.h"
42 /* paths to these include files come from --with-ctdb= in configure */
44 struct ctdbd_srvid_cb {
45 uint64_t srvid;
46 int (*cb)(struct tevent_context *ev,
47 uint32_t src_vnn, uint32_t dst_vnn,
48 uint64_t dst_srvid,
49 const uint8_t *msg, size_t msglen,
50 void *private_data);
51 void *private_data;
54 struct ctdbd_connection {
55 uint32_t reqid;
56 uint32_t our_vnn;
57 uint64_t rand_srvid;
58 struct ctdbd_srvid_cb *callbacks;
59 int fd;
60 int timeout;
63 * Outgoing queue for writev_send of asynchronous ctdb requests
65 struct tevent_queue *outgoing;
66 struct tevent_req **pending;
67 struct tevent_req *read_req;
70 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
72 size_t len = talloc_array_length(conn->pending);
73 return (len != 0);
76 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
78 conn->reqid += 1;
79 if (conn->reqid == 0) {
80 conn->reqid += 1;
82 return conn->reqid;
85 static int ctdbd_control(struct ctdbd_connection *conn,
86 uint32_t vnn, uint32_t opcode,
87 uint64_t srvid, uint32_t flags,
88 TDB_DATA data,
89 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
90 int32_t *cstatus);
93 * exit on fatal communications errors with the ctdbd daemon
95 static void cluster_fatal(const char *why)
97 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
98 /* we don't use smb_panic() as we don't want to delay to write
99 a core file. We need to release this process id immediately
100 so that someone else can take over without getting sharing
101 violations */
102 _exit(1);
108 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
110 if (DEBUGLEVEL < 11) {
111 return;
113 DEBUGADD(11, ("len=%"PRIu32", magic=%"PRIu32", vers=%"PRIu32", "
114 "gen=%"PRIu32", op=%"PRIu32", reqid=%"PRIu32"\n",
115 hdr->length,
116 hdr->ctdb_magic,
117 hdr->ctdb_version,
118 hdr->generation,
119 hdr->operation,
120 hdr->reqid));
124 * Register a srvid with ctdbd
126 int register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
127 int (*cb)(struct tevent_context *ev,
128 uint32_t src_vnn, uint32_t dst_vnn,
129 uint64_t dst_srvid,
130 const uint8_t *msg, size_t msglen,
131 void *private_data),
132 void *private_data)
135 int ret;
136 int32_t cstatus;
137 size_t num_callbacks;
138 struct ctdbd_srvid_cb *tmp;
140 ret = ctdbd_control_local(conn, CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
141 tdb_null, NULL, NULL, &cstatus);
142 if (ret != 0) {
143 return ret;
146 num_callbacks = talloc_array_length(conn->callbacks);
148 tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
149 num_callbacks + 1);
150 if (tmp == NULL) {
151 return ENOMEM;
153 conn->callbacks = tmp;
155 conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
156 .srvid = srvid, .cb = cb, .private_data = private_data
159 return 0;
162 static int ctdbd_msg_call_back(struct tevent_context *ev,
163 struct ctdbd_connection *conn,
164 struct ctdb_req_message_old *msg)
166 uint32_t msg_len;
167 size_t i, num_callbacks;
169 msg_len = msg->hdr.length;
170 if (msg_len < offsetof(struct ctdb_req_message_old, data)) {
171 DBG_DEBUG("len %"PRIu32" too small\n", msg_len);
172 return 0;
174 msg_len -= offsetof(struct ctdb_req_message_old, data);
176 if (msg_len < msg->datalen) {
177 DBG_DEBUG("msg_len=%"PRIu32" < msg->datalen=%"PRIu32"\n",
178 msg_len, msg->datalen);
179 return 0;
182 num_callbacks = talloc_array_length(conn->callbacks);
184 for (i=0; i<num_callbacks; i++) {
185 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
187 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
188 int ret;
190 ret = cb->cb(ev,
191 msg->hdr.srcnode, msg->hdr.destnode,
192 msg->srvid, msg->data, msg->datalen,
193 cb->private_data);
194 if (ret != 0) {
195 return ret;
199 return 0;
203 * get our vnn from the cluster
205 static int get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
207 int32_t cstatus=-1;
208 int ret;
209 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_PNN, 0, 0,
210 tdb_null, NULL, NULL, &cstatus);
211 if (ret != 0) {
212 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
213 return ret;
215 *vnn = (uint32_t)cstatus;
216 return ret;
220 * Are we active (i.e. not banned or stopped?)
222 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
224 int32_t cstatus=-1;
225 TDB_DATA outdata = {0};
226 struct ctdb_node_map_old *m;
227 bool ok = false;
228 uint32_t i;
229 int ret;
231 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_NODEMAP, 0, 0,
232 tdb_null, talloc_tos(), &outdata, &cstatus);
233 if (ret != 0) {
234 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
235 return false;
237 if ((cstatus != 0) || (outdata.dptr == NULL)) {
238 DEBUG(2, ("Received invalid ctdb data\n"));
239 goto fail;
242 m = (struct ctdb_node_map_old *)outdata.dptr;
244 for (i=0; i<m->num; i++) {
245 if (vnn == m->nodes[i].pnn) {
246 break;
250 if (i == m->num) {
251 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
252 (int)vnn));
253 goto fail;
256 if ((m->nodes[i].flags & NODE_FLAGS_INACTIVE) != 0) {
257 DEBUG(2, ("Node has status %x, not active\n",
258 (int)m->nodes[i].flags));
259 goto fail;
262 ok = true;
263 fail:
264 TALLOC_FREE(outdata.dptr);
265 return ok;
268 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
270 return conn->our_vnn;
274 * Get us a ctdb connection
277 static int ctdbd_connect(const char *sockname, int *pfd)
279 struct samba_sockaddr addr = {
280 .sa_socklen = sizeof(struct sockaddr_un),
281 .u = {
282 .un = {
283 .sun_family = AF_UNIX,
287 int fd;
288 size_t namelen;
289 int ret;
291 fd = socket(AF_UNIX, SOCK_STREAM, 0);
292 if (fd == -1) {
293 int err = errno;
294 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
295 return err;
298 namelen = strlcpy(addr.u.un.sun_path,
299 sockname,
300 sizeof(addr.u.un.sun_path));
301 if (namelen >= sizeof(addr.u.un.sun_path)) {
302 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
303 sockname));
304 close(fd);
305 return ENAMETOOLONG;
308 ret = connect(fd, &addr.u.sa, addr.sa_socklen);
309 if (ret == -1) {
310 int err = errno;
311 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
312 strerror(err)));
313 close(fd);
314 return err;
317 *pfd = fd;
318 return 0;
321 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
322 struct ctdb_req_header **result)
324 struct ctdb_req_header *req;
325 uint32_t msglen;
326 ssize_t nread;
328 if (timeout != -1) {
329 struct pollfd pfd = { .fd = fd, .events = POLLIN };
330 int ret;
332 ret = sys_poll_intr(&pfd, 1, timeout);
333 if (ret == -1) {
334 return errno;
336 if (ret == 0) {
337 return ETIMEDOUT;
339 if (ret != 1) {
340 return EIO;
344 nread = read_data(fd, &msglen, sizeof(msglen));
345 if (nread == -1) {
346 return errno;
348 if (nread == 0) {
349 return EIO;
352 if (msglen < sizeof(struct ctdb_req_header)) {
353 return EIO;
356 req = talloc_size(mem_ctx, msglen);
357 if (req == NULL) {
358 return ENOMEM;
360 talloc_set_name_const(req, "struct ctdb_req_header");
362 req->length = msglen;
364 nread = read_data(fd, ((char *)req) + sizeof(msglen),
365 msglen - sizeof(msglen));
366 if (nread == -1) {
367 TALLOC_FREE(req);
368 return errno;
370 if (nread == 0) {
371 TALLOC_FREE(req);
372 return EIO;
375 *result = req;
376 return 0;
380 * Read a full ctdbd request. If we have a messaging context, defer incoming
381 * messages that might come in between.
384 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
385 TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
387 struct ctdb_req_header *hdr = NULL;
388 int ret;
390 next_pkt:
392 ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
393 if (ret != 0) {
394 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
395 cluster_fatal("failed to read data from ctdbd\n");
396 return -1;
398 SMB_ASSERT(hdr != NULL);
400 DEBUG(11, ("Received ctdb packet\n"));
401 ctdb_packet_dump(hdr);
403 if (hdr->operation == CTDB_REQ_MESSAGE) {
404 struct ctdb_req_message_old *msg = (struct ctdb_req_message_old *)hdr;
406 ret = ctdbd_msg_call_back(NULL, conn, msg);
407 if (ret != 0) {
408 TALLOC_FREE(hdr);
409 return ret;
412 TALLOC_FREE(hdr);
413 goto next_pkt;
416 if ((reqid != 0) && (hdr->reqid != reqid)) {
417 /* we got the wrong reply */
418 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
419 "been %u\n", hdr->reqid, reqid));
420 TALLOC_FREE(hdr);
421 goto next_pkt;
424 *result = talloc_move(mem_ctx, &hdr);
426 return 0;
429 static int ctdbd_connection_destructor(struct ctdbd_connection *c);
432 * Get us a ctdbd connection
435 static int ctdbd_init_connection_internal(TALLOC_CTX *mem_ctx,
436 const char *sockname, int timeout,
437 struct ctdbd_connection *conn)
439 int ret;
441 conn->timeout = timeout;
442 if (conn->timeout == 0) {
443 conn->timeout = -1;
446 ret = ctdbd_connect(sockname, &conn->fd);
447 if (ret != 0) {
448 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
449 return ret;
451 talloc_set_destructor(conn, ctdbd_connection_destructor);
453 ret = get_cluster_vnn(conn, &conn->our_vnn);
454 if (ret != 0) {
455 DEBUG(10, ("get_cluster_vnn failed: %s\n", strerror(ret)));
456 return ret;
459 if (!ctdbd_working(conn, conn->our_vnn)) {
460 DEBUG(2, ("Node is not working, can not connect\n"));
461 return EIO;
464 generate_random_buffer((unsigned char *)&conn->rand_srvid,
465 sizeof(conn->rand_srvid));
467 ret = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
468 if (ret != 0) {
469 DEBUG(5, ("Could not register random srvid: %s\n",
470 strerror(ret)));
471 return ret;
474 return 0;
477 int ctdbd_init_connection(TALLOC_CTX *mem_ctx,
478 const char *sockname, int timeout,
479 struct ctdbd_connection **pconn)
481 struct ctdbd_connection *conn;
482 int ret;
484 if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
485 DEBUG(0, ("talloc failed\n"));
486 return ENOMEM;
489 ret = ctdbd_init_connection_internal(mem_ctx,
490 sockname,
491 timeout,
492 conn);
493 if (ret != 0) {
494 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
495 strerror(ret));
496 goto fail;
499 *pconn = conn;
500 return 0;
502 fail:
503 TALLOC_FREE(conn);
504 return ret;
507 int ctdbd_reinit_connection(TALLOC_CTX *mem_ctx,
508 const char *sockname, int timeout,
509 struct ctdbd_connection *conn)
511 int ret;
513 ret = ctdbd_connection_destructor(conn);
514 if (ret != 0) {
515 DBG_ERR("ctdbd_connection_destructor failed\n");
516 return ret;
519 ret = ctdbd_init_connection_internal(mem_ctx,
520 sockname,
521 timeout,
522 conn);
523 if (ret != 0) {
524 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
525 strerror(ret));
526 return ret;
529 return 0;
532 int ctdbd_init_async_connection(
533 TALLOC_CTX *mem_ctx,
534 const char *sockname,
535 int timeout,
536 struct ctdbd_connection **pconn)
538 struct ctdbd_connection *conn = NULL;
539 int ret;
541 *pconn = NULL;
543 ret = ctdbd_init_connection(mem_ctx, sockname, timeout, &conn);
544 if (ret != 0) {
545 return ret;
548 ret = set_blocking(conn->fd, false);
549 if (ret == -1) {
550 int err = errno;
551 SMB_ASSERT(err != 0);
552 TALLOC_FREE(conn);
553 return err;
556 conn->outgoing = tevent_queue_create(conn, "ctdb async outgoing");
557 if (conn->outgoing == NULL) {
558 TALLOC_FREE(conn);
559 return ENOMEM;
562 *pconn = conn;
563 return 0;
566 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
568 return conn->fd;
572 * Packet handler to receive and handle a ctdb message
574 static int ctdb_handle_message(struct tevent_context *ev,
575 struct ctdbd_connection *conn,
576 struct ctdb_req_header *hdr)
578 struct ctdb_req_message_old *msg;
580 if (hdr->operation != CTDB_REQ_MESSAGE) {
581 DEBUG(0, ("Received async msg of type %u, discarding\n",
582 hdr->operation));
583 return EINVAL;
586 msg = (struct ctdb_req_message_old *)hdr;
588 ctdbd_msg_call_back(ev, conn, msg);
590 return 0;
593 void ctdbd_socket_readable(struct tevent_context *ev,
594 struct ctdbd_connection *conn)
596 struct ctdb_req_header *hdr = NULL;
597 int ret;
599 ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
600 if (ret != 0) {
601 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
602 cluster_fatal("failed to read data from ctdbd\n");
604 SMB_ASSERT(hdr != NULL);
606 ret = ctdb_handle_message(ev, conn, hdr);
608 TALLOC_FREE(hdr);
610 if (ret != 0) {
611 DEBUG(10, ("could not handle incoming message: %s\n",
612 strerror(ret)));
616 int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
617 uint32_t dst_vnn, uint64_t dst_srvid,
618 const struct iovec *iov, int iovlen)
620 struct ctdb_req_message_old r;
621 struct iovec iov2[iovlen+1];
622 size_t buflen = iov_buflen(iov, iovlen);
623 ssize_t nwritten;
625 r.hdr.length = offsetof(struct ctdb_req_message_old, data) + buflen;
626 r.hdr.ctdb_magic = CTDB_MAGIC;
627 r.hdr.ctdb_version = CTDB_PROTOCOL;
628 r.hdr.generation = 1;
629 r.hdr.operation = CTDB_REQ_MESSAGE;
630 r.hdr.destnode = dst_vnn;
631 r.hdr.srcnode = conn->our_vnn;
632 r.hdr.reqid = 0;
633 r.srvid = dst_srvid;
634 r.datalen = buflen;
636 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
637 ctdb_packet_dump(&r.hdr);
639 iov2[0].iov_base = &r;
640 iov2[0].iov_len = offsetof(struct ctdb_req_message_old, data);
641 memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
643 nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
644 if (nwritten == -1) {
645 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
646 cluster_fatal("cluster dispatch daemon msg write error\n");
649 return 0;
653 * send/recv a generic ctdb control message
655 static int ctdbd_control(struct ctdbd_connection *conn,
656 uint32_t vnn, uint32_t opcode,
657 uint64_t srvid, uint32_t flags,
658 TDB_DATA data,
659 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
660 int32_t *cstatus)
662 struct ctdb_req_control_old req;
663 struct ctdb_req_header *hdr;
664 struct ctdb_reply_control_old *reply = NULL;
665 struct iovec iov[2];
666 ssize_t nwritten;
667 int ret;
669 if (ctdbd_conn_has_async_reqs(conn)) {
671 * Can't use sync call while an async call is in flight. Adding
672 * this check as a safety net. We'll be using different
673 * connections for sync and async requests, so this shouldn't
674 * happen, but who knows...
676 DBG_ERR("Async ctdb req on sync connection\n");
677 return EINVAL;
680 ZERO_STRUCT(req);
681 req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
682 req.hdr.ctdb_magic = CTDB_MAGIC;
683 req.hdr.ctdb_version = CTDB_PROTOCOL;
684 req.hdr.operation = CTDB_REQ_CONTROL;
685 req.hdr.reqid = ctdbd_next_reqid(conn);
686 req.hdr.destnode = vnn;
687 req.opcode = opcode;
688 req.srvid = srvid;
689 req.datalen = data.dsize;
690 req.flags = flags;
692 DBG_DEBUG("Sending ctdb packet reqid=%"PRIu32", vnn=%"PRIu32", "
693 "opcode=%"PRIu32", srvid=%"PRIu64"\n", req.hdr.reqid,
694 req.hdr.destnode, req.opcode, req.srvid);
695 ctdb_packet_dump(&req.hdr);
697 iov[0].iov_base = &req;
698 iov[0].iov_len = offsetof(struct ctdb_req_control_old, data);
699 iov[1].iov_base = data.dptr;
700 iov[1].iov_len = data.dsize;
702 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
703 if (nwritten == -1) {
704 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
705 cluster_fatal("cluster dispatch daemon msg write error\n");
708 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
709 if (cstatus) {
710 *cstatus = 0;
712 return 0;
715 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
716 if (ret != 0) {
717 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
718 return ret;
721 if (hdr->operation != CTDB_REPLY_CONTROL) {
722 DEBUG(0, ("received invalid reply\n"));
723 TALLOC_FREE(hdr);
724 return EIO;
726 reply = (struct ctdb_reply_control_old *)hdr;
728 if (outdata) {
729 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
730 mem_ctx, reply->data, reply->datalen))) {
731 TALLOC_FREE(reply);
732 return ENOMEM;
734 outdata->dsize = reply->datalen;
736 if (cstatus) {
737 (*cstatus) = reply->status;
740 TALLOC_FREE(reply);
741 return ret;
745 * see if a remote process exists
747 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn,
748 pid_t pid, uint64_t unique_id)
750 uint8_t buf[sizeof(pid)+sizeof(unique_id)];
751 int32_t cstatus = 0;
752 int ret;
754 if (unique_id == SERVERID_UNIQUE_ID_NOT_TO_VERIFY) {
755 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS,
756 0, 0,
757 (TDB_DATA) { .dptr = (uint8_t *)&pid,
758 .dsize = sizeof(pid) },
759 NULL, NULL, &cstatus);
760 if (ret != 0) {
761 return false;
763 return (cstatus == 0);
766 memcpy(buf, &pid, sizeof(pid));
767 memcpy(buf+sizeof(pid), &unique_id, sizeof(unique_id));
769 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_CHECK_PID_SRVID, 0, 0,
770 (TDB_DATA) { .dptr = buf, .dsize = sizeof(buf) },
771 NULL, NULL, &cstatus);
772 if (ret != 0) {
773 return false;
775 return (cstatus == 0);
779 * Get a db path
781 char *ctdbd_dbpath(struct ctdbd_connection *conn,
782 TALLOC_CTX *mem_ctx, uint32_t db_id)
784 int ret;
785 TDB_DATA data;
786 TDB_DATA rdata = {0};
787 int32_t cstatus = 0;
789 data.dptr = (uint8_t*)&db_id;
790 data.dsize = sizeof(db_id);
792 ret = ctdbd_control_local(conn, CTDB_CONTROL_GETDBPATH, 0, 0, data,
793 mem_ctx, &rdata, &cstatus);
794 if ((ret != 0) || cstatus != 0) {
795 DEBUG(0, (__location__ " ctdb_control for getdbpath failed: %s\n",
796 strerror(ret)));
797 TALLOC_FREE(rdata.dptr);
800 return (char *)rdata.dptr;
804 * attach to a ctdb database
806 int ctdbd_db_attach(struct ctdbd_connection *conn,
807 const char *name, uint32_t *db_id, bool persistent)
809 int ret;
810 TDB_DATA data = {0};
811 int32_t cstatus;
813 data = string_term_tdb_data(name);
815 ret = ctdbd_control_local(conn,
816 persistent
817 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
818 : CTDB_CONTROL_DB_ATTACH,
819 0, 0, data, NULL, &data, &cstatus);
820 if (ret != 0) {
821 DEBUG(0, (__location__ " ctdb_control for db_attach "
822 "failed: %s\n", strerror(ret)));
823 return ret;
826 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
827 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
828 TALLOC_FREE(data.dptr);
829 return EIO;
832 *db_id = *(uint32_t *)data.dptr;
833 talloc_free(data.dptr);
835 return 0;
839 * force the migration of a record to this node
841 int ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key)
843 struct ctdb_req_call_old req;
844 struct ctdb_req_header *hdr = NULL;
845 struct iovec iov[2];
846 ssize_t nwritten;
847 int ret;
849 if (ctdbd_conn_has_async_reqs(conn)) {
851 * Can't use sync call while an async call is in flight. Adding
852 * this check as a safety net. We'll be using different
853 * connections for sync and async requests, so this shouldn't
854 * happen, but who knows...
856 DBG_ERR("Async ctdb req on sync connection\n");
857 return EINVAL;
860 ZERO_STRUCT(req);
862 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
863 req.hdr.ctdb_magic = CTDB_MAGIC;
864 req.hdr.ctdb_version = CTDB_PROTOCOL;
865 req.hdr.operation = CTDB_REQ_CALL;
866 req.hdr.reqid = ctdbd_next_reqid(conn);
867 req.flags = CTDB_IMMEDIATE_MIGRATION;
868 req.callid = CTDB_NULL_FUNC;
869 req.db_id = db_id;
870 req.keylen = key.dsize;
872 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
873 ctdb_packet_dump(&req.hdr);
875 iov[0].iov_base = &req;
876 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
877 iov[1].iov_base = key.dptr;
878 iov[1].iov_len = key.dsize;
880 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
881 if (nwritten == -1) {
882 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
883 cluster_fatal("cluster dispatch daemon msg write error\n");
886 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
887 if (ret != 0) {
888 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
889 goto fail;
892 if (hdr->operation != CTDB_REPLY_CALL) {
893 if (hdr->operation == CTDB_REPLY_ERROR) {
894 DBG_ERR("received error from ctdb\n");
895 } else {
896 DBG_ERR("received invalid reply\n");
898 ret = EIO;
899 goto fail;
902 fail:
904 TALLOC_FREE(hdr);
905 return ret;
909 * Fetch a record and parse it
911 int ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
912 TDB_DATA key, bool local_copy,
913 void (*parser)(TDB_DATA key, TDB_DATA data,
914 void *private_data),
915 void *private_data)
917 struct ctdb_req_call_old req;
918 struct ctdb_req_header *hdr = NULL;
919 struct ctdb_reply_call_old *reply;
920 struct iovec iov[2];
921 ssize_t nwritten;
922 uint32_t flags;
923 int ret;
925 if (ctdbd_conn_has_async_reqs(conn)) {
927 * Can't use sync call while an async call is in flight. Adding
928 * this check as a safety net. We'll be using different
929 * connections for sync and async requests, so this shouldn't
930 * happen, but who knows...
932 DBG_ERR("Async ctdb req on sync connection\n");
933 return EINVAL;
936 flags = local_copy ? CTDB_WANT_READONLY : 0;
938 ZERO_STRUCT(req);
940 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
941 req.hdr.ctdb_magic = CTDB_MAGIC;
942 req.hdr.ctdb_version = CTDB_PROTOCOL;
943 req.hdr.operation = CTDB_REQ_CALL;
944 req.hdr.reqid = ctdbd_next_reqid(conn);
945 req.flags = flags;
946 req.callid = CTDB_FETCH_FUNC;
947 req.db_id = db_id;
948 req.keylen = key.dsize;
950 iov[0].iov_base = &req;
951 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
952 iov[1].iov_base = key.dptr;
953 iov[1].iov_len = key.dsize;
955 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
956 if (nwritten == -1) {
957 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
958 cluster_fatal("cluster dispatch daemon msg write error\n");
961 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
962 if (ret != 0) {
963 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
964 goto fail;
967 if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
968 DEBUG(0, ("received invalid reply\n"));
969 ret = EIO;
970 goto fail;
972 reply = (struct ctdb_reply_call_old *)hdr;
974 if (reply->datalen == 0) {
976 * Treat an empty record as non-existing
978 ret = ENOENT;
979 goto fail;
982 parser(key, make_tdb_data(&reply->data[0], reply->datalen),
983 private_data);
985 ret = 0;
986 fail:
987 TALLOC_FREE(hdr);
988 return ret;
992 Traverse a ctdb database. "conn" must be an otherwise unused
993 ctdb_connection where no other messages but the traverse ones are
994 expected.
997 int ctdbd_traverse(struct ctdbd_connection *conn, uint32_t db_id,
998 void (*fn)(TDB_DATA key, TDB_DATA data,
999 void *private_data),
1000 void *private_data)
1002 int ret;
1003 TDB_DATA key, data;
1004 struct ctdb_traverse_start t;
1005 int32_t cstatus = 0;
1007 if (ctdbd_conn_has_async_reqs(conn)) {
1009 * Can't use sync call while an async call is in flight. Adding
1010 * this check as a safety net. We'll be using different
1011 * connections for sync and async requests, so this shouldn't
1012 * happen, but who knows...
1014 DBG_ERR("Async ctdb req on sync connection\n");
1015 return EINVAL;
1018 t.db_id = db_id;
1019 t.srvid = conn->rand_srvid;
1020 t.reqid = ctdbd_next_reqid(conn);
1022 data.dptr = (uint8_t *)&t;
1023 data.dsize = sizeof(t);
1025 ret = ctdbd_control_local(conn, CTDB_CONTROL_TRAVERSE_START,
1026 conn->rand_srvid,
1027 0, data, NULL, NULL, &cstatus);
1029 if ((ret != 0) || (cstatus != 0)) {
1030 DEBUG(0,("ctdbd_control failed: %s, %d\n", strerror(ret),
1031 cstatus));
1033 if (ret == 0) {
1035 * We need a mapping here
1037 ret = EIO;
1039 return ret;
1042 while (true) {
1043 struct ctdb_req_header *hdr = NULL;
1044 struct ctdb_req_message_old *m;
1045 struct ctdb_rec_data_old *d;
1047 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1048 if (ret != 0) {
1049 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
1050 cluster_fatal("failed to read data from ctdbd\n");
1052 SMB_ASSERT(hdr != NULL);
1054 if (hdr->operation != CTDB_REQ_MESSAGE) {
1055 DEBUG(0, ("Got operation %u, expected a message\n",
1056 (unsigned)hdr->operation));
1057 return EIO;
1060 m = (struct ctdb_req_message_old *)hdr;
1061 d = (struct ctdb_rec_data_old *)&m->data[0];
1062 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1063 DEBUG(0, ("Got invalid traverse data of length %d\n",
1064 (int)m->datalen));
1065 return EIO;
1068 key.dsize = d->keylen;
1069 key.dptr = &d->data[0];
1070 data.dsize = d->datalen;
1071 data.dptr = &d->data[d->keylen];
1073 if (key.dsize == 0 && data.dsize == 0) {
1074 /* end of traverse */
1075 return 0;
1078 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1079 DEBUG(0, ("Got invalid ltdb header length %d\n",
1080 (int)data.dsize));
1081 return EIO;
1083 data.dsize -= sizeof(struct ctdb_ltdb_header);
1084 data.dptr += sizeof(struct ctdb_ltdb_header);
1086 if (fn != NULL) {
1087 fn(key, data, private_data);
1090 return 0;
1094 This is used to canonicalize a ctdb_sock_addr structure.
1096 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1097 struct sockaddr_storage *out)
1099 memcpy(out, in, sizeof (*out));
1101 #ifdef HAVE_IPV6
1102 if (in->ss_family == AF_INET6) {
1103 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1104 const struct sockaddr_in6 *in6 =
1105 (const struct sockaddr_in6 *)in;
1106 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1107 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1108 memset(out, 0, sizeof(*out));
1109 #ifdef HAVE_SOCK_SIN_LEN
1110 out4->sin_len = sizeof(*out);
1111 #endif
1112 out4->sin_family = AF_INET;
1113 out4->sin_port = in6->sin6_port;
1114 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1117 #endif
1121 * Register us as a server for a particular tcp connection
1124 int ctdbd_register_ips(struct ctdbd_connection *conn,
1125 const struct sockaddr_storage *_server,
1126 const struct sockaddr_storage *_client,
1127 int (*cb)(struct tevent_context *ev,
1128 uint32_t src_vnn, uint32_t dst_vnn,
1129 uint64_t dst_srvid,
1130 const uint8_t *msg, size_t msglen,
1131 void *private_data),
1132 void *private_data)
1134 struct ctdb_connection p;
1135 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1136 int ret;
1137 struct sockaddr_storage client;
1138 struct sockaddr_storage server;
1141 * Only one connection so far
1144 smbd_ctdb_canonicalize_ip(_client, &client);
1145 smbd_ctdb_canonicalize_ip(_server, &server);
1147 switch (client.ss_family) {
1148 case AF_INET:
1149 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1150 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1151 break;
1152 case AF_INET6:
1153 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1154 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1155 break;
1156 default:
1157 return EIO;
1161 * We want to be told about IP releases
1164 ret = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1165 cb, private_data);
1166 if (ret != 0) {
1167 return ret;
1171 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1172 * can send an extra ack to trigger a reset for our client, so it
1173 * immediately reconnects
1175 ret = ctdbd_control_local(conn,
1176 CTDB_CONTROL_TCP_CLIENT, 0,
1177 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1178 NULL);
1179 if (ret != 0) {
1180 return ret;
1182 return 0;
1185 static int ctdbd_control_get_public_ips(struct ctdbd_connection *conn,
1186 uint32_t flags,
1187 TALLOC_CTX *mem_ctx,
1188 struct ctdb_public_ip_list_old **_ips)
1190 struct ctdb_public_ip_list_old *ips = NULL;
1191 TDB_DATA outdata;
1192 int32_t cstatus = -1;
1193 size_t min_dsize;
1194 size_t max_ips;
1195 int ret;
1197 *_ips = NULL;
1199 ret = ctdbd_control_local(conn,
1200 CTDB_CONTROL_GET_PUBLIC_IPS,
1201 0, /* srvid */
1202 flags,
1203 tdb_null, /* indata */
1204 mem_ctx,
1205 &outdata,
1206 &cstatus);
1207 if (ret != 0 || cstatus != 0) {
1208 DBG_ERR("ctdb_control for getpublicips failed ret:%d cstatus:%d\n",
1209 ret, (int)cstatus);
1210 return -1;
1213 min_dsize = offsetof(struct ctdb_public_ip_list_old, ips);
1214 if (outdata.dsize < min_dsize) {
1215 DBG_ERR("outdata.dsize=%zu < min_dsize=%zu\n",
1216 outdata.dsize, min_dsize);
1217 return -1;
1219 max_ips = (outdata.dsize - min_dsize)/sizeof(struct ctdb_public_ip);
1220 ips = (struct ctdb_public_ip_list_old *)outdata.dptr;
1221 if ((size_t)ips->num > max_ips) {
1222 DBG_ERR("ips->num=%zu > max_ips=%zu\n",
1223 (size_t)ips->num, max_ips);
1224 return -1;
1227 *_ips = ips;
1228 return 0;
1231 int ctdbd_public_ip_foreach(struct ctdbd_connection *conn,
1232 int (*cb)(uint32_t total_ip_count,
1233 const struct sockaddr_storage *ip,
1234 bool is_movable_ip,
1235 void *private_data),
1236 void *private_data)
1238 uint32_t i;
1239 struct ctdb_public_ip_list_old *ips = NULL;
1240 int ret = ENOMEM;
1241 TALLOC_CTX *frame = talloc_stackframe();
1243 ret = ctdbd_control_get_public_ips(conn, 0, frame, &ips);
1244 if (ret < 0) {
1245 ret = EIO;
1246 goto out_free;
1249 for (i=0; i < ips->num; i++) {
1250 struct samba_sockaddr tmp = {
1251 .u = {
1252 .sa = ips->ips[i].addr.sa,
1256 ret = cb(ips->num,
1257 &tmp.u.ss,
1258 true, /* all ctdb public ips are movable */
1259 private_data);
1260 if (ret != 0) {
1261 goto out_free;
1265 ret = 0;
1266 out_free:
1267 TALLOC_FREE(frame);
1268 return ret;
1272 call a control on the local node
1274 int ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1275 uint64_t srvid, uint32_t flags, TDB_DATA data,
1276 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1277 int32_t *cstatus)
1279 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data,
1280 mem_ctx, outdata, cstatus);
1283 int ctdb_watch_us(struct ctdbd_connection *conn)
1285 struct ctdb_notify_data_old reg_data;
1286 size_t struct_len;
1287 int ret;
1288 int32_t cstatus;
1290 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1291 reg_data.len = 1;
1292 reg_data.notify_data[0] = 0;
1294 struct_len = offsetof(struct ctdb_notify_data_old,
1295 notify_data) + reg_data.len;
1297 ret = ctdbd_control_local(
1298 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1299 make_tdb_data((uint8_t *)&reg_data, struct_len),
1300 NULL, NULL, &cstatus);
1301 if (ret != 0) {
1302 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1303 strerror(ret)));
1305 return ret;
1308 int ctdb_unwatch(struct ctdbd_connection *conn)
1310 uint64_t srvid = CTDB_SRVID_SAMBA_NOTIFY;
1311 int ret;
1312 int32_t cstatus;
1314 ret = ctdbd_control_local(
1315 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1316 make_tdb_data((uint8_t *)&srvid, sizeof(srvid)),
1317 NULL, NULL, &cstatus);
1318 if (ret != 0) {
1319 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1320 strerror(ret)));
1322 return ret;
1325 int ctdbd_probe(const char *sockname, int timeout)
1328 * Do a very early check if ctdbd is around to avoid an abort and core
1329 * later
1331 struct ctdbd_connection *conn = NULL;
1332 int ret;
1334 ret = ctdbd_init_connection(talloc_tos(), sockname, timeout,
1335 &conn);
1338 * We only care if we can connect.
1340 TALLOC_FREE(conn);
1342 return ret;
1345 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
1347 if (c->fd != -1) {
1348 close(c->fd);
1349 c->fd = -1;
1351 return 0;
1354 void ctdbd_prep_hdr_next_reqid(
1355 struct ctdbd_connection *conn, struct ctdb_req_header *hdr)
1357 *hdr = (struct ctdb_req_header) {
1358 .ctdb_magic = CTDB_MAGIC,
1359 .ctdb_version = CTDB_PROTOCOL,
1360 .reqid = ctdbd_next_reqid(conn),
1361 .destnode = CTDB_CURRENT_NODE,
1365 struct ctdbd_pkt_read_state {
1366 uint8_t *pkt;
1369 static ssize_t ctdbd_pkt_read_more(
1370 uint8_t *buf, size_t buflen, void *private_data);
1371 static void ctdbd_pkt_read_done(struct tevent_req *subreq);
1373 static struct tevent_req *ctdbd_pkt_read_send(
1374 TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd)
1376 struct tevent_req *req = NULL, *subreq = NULL;
1377 struct ctdbd_pkt_read_state *state = NULL;
1379 req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state);
1380 if (req == NULL) {
1381 return NULL;
1383 subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL);
1384 if (tevent_req_nomem(subreq, req)) {
1385 return tevent_req_post(req, ev);
1387 tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req);
1388 return req;
1391 static ssize_t ctdbd_pkt_read_more(
1392 uint8_t *buf, size_t buflen, void *private_data)
1394 uint32_t msglen;
1395 if (buflen < 4) {
1396 return -1;
1398 if (buflen > 4) {
1399 return 0; /* Been here, done */
1401 memcpy(&msglen, buf, 4);
1403 if (msglen < sizeof(struct ctdb_req_header)) {
1404 return -1;
1406 return msglen - sizeof(msglen);
1409 static void ctdbd_pkt_read_done(struct tevent_req *subreq)
1411 struct tevent_req *req = tevent_req_callback_data(
1412 subreq, struct tevent_req);
1413 struct ctdbd_pkt_read_state *state = tevent_req_data(
1414 req, struct ctdbd_pkt_read_state);
1415 ssize_t nread;
1416 int err;
1418 nread = read_packet_recv(subreq, state, &state->pkt, &err);
1419 TALLOC_FREE(subreq);
1420 if (nread == -1) {
1421 tevent_req_error(req, err);
1422 return;
1424 tevent_req_done(req);
1427 static int ctdbd_pkt_read_recv(
1428 struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt)
1430 struct ctdbd_pkt_read_state *state = tevent_req_data(
1431 req, struct ctdbd_pkt_read_state);
1432 int err;
1434 if (tevent_req_is_unix_error(req, &err)) {
1435 return err;
1437 *pkt = talloc_move(mem_ctx, &state->pkt);
1438 tevent_req_received(req);
1439 return 0;
1442 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn);
1443 static void ctdbd_conn_received(struct tevent_req *subreq);
1445 struct ctdbd_req_state {
1446 struct ctdbd_connection *conn;
1447 struct tevent_context *ev;
1448 uint32_t reqid;
1449 struct ctdb_req_header *reply;
1452 static void ctdbd_req_unset_pending(struct tevent_req *req)
1454 struct ctdbd_req_state *state = tevent_req_data(
1455 req, struct ctdbd_req_state);
1456 struct ctdbd_connection *conn = state->conn;
1457 size_t num_pending = talloc_array_length(conn->pending);
1458 size_t i, num_after;
1460 tevent_req_set_cleanup_fn(req, NULL);
1462 if (num_pending == 1) {
1464 * conn->read_req is a child of conn->pending
1466 TALLOC_FREE(conn->pending);
1467 conn->read_req = NULL;
1468 return;
1471 for (i=0; i<num_pending; i++) {
1472 if (req == conn->pending[i]) {
1473 break;
1476 if (i == num_pending) {
1478 * Something's seriously broken. Just returning here is the
1479 * right thing nevertheless, the point of this routine is to
1480 * remove ourselves from conn->pending.
1482 return;
1485 num_after = num_pending - i - 1;
1486 if (num_after > 0) {
1487 memmove(&conn->pending[i],
1488 &conn->pending[i] + 1,
1489 sizeof(*conn->pending) * num_after);
1491 conn->pending = talloc_realloc(
1492 NULL, conn->pending, struct tevent_req *, num_pending - 1);
1495 static void ctdbd_req_cleanup(
1496 struct tevent_req *req, enum tevent_req_state req_state)
1498 ctdbd_req_unset_pending(req);
1501 static bool ctdbd_req_set_pending(struct tevent_req *req)
1503 struct ctdbd_req_state *state = tevent_req_data(
1504 req, struct ctdbd_req_state);
1505 struct ctdbd_connection *conn = state->conn;
1506 struct tevent_req **pending = NULL;
1507 size_t num_pending = talloc_array_length(conn->pending);
1508 bool ok;
1510 pending = talloc_realloc(
1511 conn, conn->pending, struct tevent_req *, num_pending + 1);
1512 if (pending == NULL) {
1513 return false;
1515 pending[num_pending] = req;
1516 conn->pending = pending;
1518 tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup);
1520 ok = ctdbd_conn_receive_next(conn);
1521 if (!ok) {
1522 ctdbd_req_unset_pending(req);
1523 return false;
1526 return true;
1529 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn)
1531 size_t num_pending = talloc_array_length(conn->pending);
1532 struct tevent_req *req = NULL;
1533 struct ctdbd_req_state *state = NULL;
1535 if (conn->read_req != NULL) {
1536 return true;
1538 if (num_pending == 0) {
1540 * done for now
1542 return true;
1545 req = conn->pending[0];
1546 state = tevent_req_data(req, struct ctdbd_req_state);
1548 conn->read_req = ctdbd_pkt_read_send(
1549 conn->pending, state->ev, conn->fd);
1550 if (conn->read_req == NULL) {
1551 return false;
1553 tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn);
1554 return true;
1557 static void ctdbd_conn_received(struct tevent_req *subreq)
1559 struct ctdbd_connection *conn = tevent_req_callback_data(
1560 subreq, struct ctdbd_connection);
1561 TALLOC_CTX *frame = talloc_stackframe();
1562 uint8_t *pkt = NULL;
1563 int ret;
1564 struct ctdb_req_header *hdr = NULL;
1565 uint32_t reqid;
1566 struct tevent_req *req = NULL;
1567 struct ctdbd_req_state *state = NULL;
1568 size_t i, num_pending;
1569 bool ok;
1571 SMB_ASSERT(subreq == conn->read_req);
1572 conn->read_req = NULL;
1574 ret = ctdbd_pkt_read_recv(subreq, frame, &pkt);
1575 TALLOC_FREE(subreq);
1576 if (ret != 0) {
1577 cluster_fatal("ctdbd_pkt_read failed\n");
1580 hdr = (struct ctdb_req_header *)pkt;
1581 reqid = hdr->reqid;
1582 num_pending = talloc_array_length(conn->pending);
1584 for (i=0; i<num_pending; i++) {
1585 req = conn->pending[i];
1586 state = tevent_req_data(req, struct ctdbd_req_state);
1587 if (state->reqid == reqid) {
1588 break;
1592 if (i == num_pending) {
1593 /* not found */
1594 TALLOC_FREE(frame);
1595 return;
1598 state->reply = talloc_move(state, &hdr);
1599 tevent_req_defer_callback(req, state->ev);
1600 tevent_req_done(req);
1602 TALLOC_FREE(frame);
1604 ok = ctdbd_conn_receive_next(conn);
1605 if (!ok) {
1606 cluster_fatal("ctdbd_conn_receive_next failed\n");
1610 static void ctdbd_req_written(struct tevent_req *subreq);
1612 struct tevent_req *ctdbd_req_send(
1613 TALLOC_CTX *mem_ctx,
1614 struct tevent_context *ev,
1615 struct ctdbd_connection *conn,
1616 struct iovec *iov,
1617 size_t num_iov)
1619 struct tevent_req *req = NULL, *subreq = NULL;
1620 struct ctdbd_req_state *state = NULL;
1621 struct ctdb_req_header *hdr = NULL;
1622 bool ok;
1624 req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state);
1625 if (req == NULL) {
1626 return NULL;
1628 state->conn = conn;
1629 state->ev = ev;
1631 if ((num_iov == 0) ||
1632 (iov[0].iov_len < sizeof(struct ctdb_req_header))) {
1633 tevent_req_error(req, EINVAL);
1634 return tevent_req_post(req, ev);
1636 hdr = iov[0].iov_base;
1637 state->reqid = hdr->reqid;
1639 ok = ctdbd_req_set_pending(req);
1640 if (!ok) {
1641 tevent_req_oom(req);
1642 return tevent_req_post(req, ev);
1645 subreq = writev_send(
1646 state, ev, conn->outgoing, conn->fd, false, iov, num_iov);
1647 if (tevent_req_nomem(subreq, req)) {
1648 return tevent_req_post(req, ev);
1650 tevent_req_set_callback(subreq, ctdbd_req_written, req);
1652 return req;
1655 static void ctdbd_req_written(struct tevent_req *subreq)
1657 struct tevent_req *req = tevent_req_callback_data(
1658 subreq, struct tevent_req);
1659 ssize_t nwritten;
1660 int err;
1662 nwritten = writev_recv(subreq, &err);
1663 TALLOC_FREE(subreq);
1664 if (nwritten == -1) {
1665 tevent_req_error(req, err);
1666 return;
1670 int ctdbd_req_recv(
1671 struct tevent_req *req,
1672 TALLOC_CTX *mem_ctx,
1673 struct ctdb_req_header **reply)
1675 struct ctdbd_req_state *state = tevent_req_data(
1676 req, struct ctdbd_req_state);
1677 int err;
1679 if (tevent_req_is_unix_error(req, &err)) {
1680 return err;
1682 *reply = talloc_move(mem_ctx, &state->reply);
1683 tevent_req_received(req);
1684 return 0;
1687 struct ctdbd_parse_state {
1688 struct tevent_context *ev;
1689 struct ctdbd_connection *conn;
1690 uint32_t reqid;
1691 TDB_DATA key;
1692 uint8_t _keybuf[64];
1693 struct ctdb_req_call_old ctdb_req;
1694 struct iovec iov[2];
1695 void (*parser)(TDB_DATA key,
1696 TDB_DATA data,
1697 void *private_data);
1698 void *private_data;
1701 static void ctdbd_parse_done(struct tevent_req *subreq);
1703 struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
1704 struct tevent_context *ev,
1705 struct ctdbd_connection *conn,
1706 uint32_t db_id,
1707 TDB_DATA key,
1708 bool local_copy,
1709 void (*parser)(TDB_DATA key,
1710 TDB_DATA data,
1711 void *private_data),
1712 void *private_data,
1713 enum dbwrap_req_state *req_state)
1715 struct tevent_req *req = NULL;
1716 struct ctdbd_parse_state *state = NULL;
1717 uint32_t flags;
1718 uint32_t packet_length;
1719 struct tevent_req *subreq = NULL;
1721 req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
1722 if (req == NULL) {
1723 *req_state = DBWRAP_REQ_ERROR;
1724 return NULL;
1727 *req_state = DBWRAP_REQ_DISPATCHED;
1729 *state = (struct ctdbd_parse_state) {
1730 .ev = ev,
1731 .conn = conn,
1732 .reqid = ctdbd_next_reqid(conn),
1733 .parser = parser,
1734 .private_data = private_data,
1737 flags = local_copy ? CTDB_WANT_READONLY : 0;
1738 packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
1741 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
1742 * all passed iov elements have a lifetime longer that the tevent_req
1743 * returned by ctdb_pkt_send_send(). This is required continue sending a
1744 * the low level request into the ctdb socket, if a higher level
1745 * ('this') request is canceled (or talloc free'd) by the application
1746 * layer, without sending invalid packets to ctdb.
1748 if (key.dsize > sizeof(state->_keybuf)) {
1749 state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
1750 if (tevent_req_nomem(state->key.dptr, req)) {
1751 return tevent_req_post(req, ev);
1753 } else {
1754 memcpy(state->_keybuf, key.dptr, key.dsize);
1755 state->key.dptr = state->_keybuf;
1757 state->key.dsize = key.dsize;
1759 state->ctdb_req.hdr.length = packet_length;
1760 state->ctdb_req.hdr.ctdb_magic = CTDB_MAGIC;
1761 state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
1762 state->ctdb_req.hdr.operation = CTDB_REQ_CALL;
1763 state->ctdb_req.hdr.reqid = state->reqid;
1764 state->ctdb_req.flags = flags;
1765 state->ctdb_req.callid = CTDB_FETCH_FUNC;
1766 state->ctdb_req.db_id = db_id;
1767 state->ctdb_req.keylen = state->key.dsize;
1769 state->iov[0].iov_base = &state->ctdb_req;
1770 state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
1771 state->iov[1].iov_base = state->key.dptr;
1772 state->iov[1].iov_len = state->key.dsize;
1774 subreq = ctdbd_req_send(
1775 state, ev, conn, state->iov, ARRAY_SIZE(state->iov));
1776 if (tevent_req_nomem(subreq, req)) {
1777 *req_state = DBWRAP_REQ_ERROR;
1778 return tevent_req_post(req, ev);
1780 tevent_req_set_callback(subreq, ctdbd_parse_done, req);
1782 return req;
1785 static void ctdbd_parse_done(struct tevent_req *subreq)
1787 struct tevent_req *req = tevent_req_callback_data(
1788 subreq, struct tevent_req);
1789 struct ctdbd_parse_state *state = tevent_req_data(
1790 req, struct ctdbd_parse_state);
1791 struct ctdb_req_header *hdr = NULL;
1792 struct ctdb_reply_call_old *reply = NULL;
1793 int ret;
1795 ret = ctdbd_req_recv(subreq, state, &hdr);
1796 TALLOC_FREE(subreq);
1797 if (tevent_req_error(req, ret)) {
1798 DBG_DEBUG("ctdb_req_recv failed %s\n", strerror(ret));
1799 return;
1801 SMB_ASSERT(hdr != NULL);
1803 if (hdr->operation != CTDB_REPLY_CALL) {
1804 DBG_ERR("received invalid reply\n");
1805 ctdb_packet_dump(hdr);
1806 tevent_req_error(req, EIO);
1807 return;
1810 reply = (struct ctdb_reply_call_old *)hdr;
1812 if (reply->datalen == 0) {
1814 * Treat an empty record as non-existing
1816 tevent_req_error(req, ENOENT);
1817 return;
1820 state->parser(state->key,
1821 make_tdb_data(&reply->data[0], reply->datalen),
1822 state->private_data);
1824 tevent_req_done(req);
1825 return;
1828 int ctdbd_parse_recv(struct tevent_req *req)
1830 return tevent_req_simple_recv_unix(req);