ldb: add tests for ldb_wildcard_compare
[Samba.git] / source3 / lib / ctdbd_conn.c
blob8fe942265900a136e4e585d4555376c066138c77
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "replace.h"
22 #include <tevent.h>
23 #include "util_tdb.h"
24 #include "serverid.h"
25 #include "ctdbd_conn.h"
26 #include "system/select.h"
27 #include "lib/util/util_net.h"
28 #include "lib/util/sys_rw_data.h"
29 #include "lib/util/iov_buf.h"
30 #include "lib/util/select.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/talloc_stack.h"
33 #include "lib/util/genrand.h"
34 #include "lib/util/fault.h"
35 #include "lib/util/dlinklist.h"
36 #include "lib/util/tevent_unix.h"
37 #include "lib/util/sys_rw.h"
38 #include "lib/util/blocking.h"
39 #include "ctdb/include/ctdb_protocol.h"
40 #include "lib/async_req/async_sock.h"
42 /* paths to these include files come from --with-ctdb= in configure */
44 struct ctdbd_srvid_cb {
45 uint64_t srvid;
46 int (*cb)(struct tevent_context *ev,
47 uint32_t src_vnn, uint32_t dst_vnn,
48 uint64_t dst_srvid,
49 const uint8_t *msg, size_t msglen,
50 void *private_data);
51 void *private_data;
54 struct ctdbd_connection {
55 uint32_t reqid;
56 uint32_t our_vnn;
57 uint64_t rand_srvid;
58 struct ctdbd_srvid_cb *callbacks;
59 int fd;
60 int timeout;
63 * Outgoing queue for writev_send of asynchronous ctdb requests
65 struct tevent_queue *outgoing;
66 struct tevent_req **pending;
67 struct tevent_req *read_req;
70 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
72 size_t len = talloc_array_length(conn->pending);
73 return (len != 0);
76 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
78 conn->reqid += 1;
79 if (conn->reqid == 0) {
80 conn->reqid += 1;
82 return conn->reqid;
85 static int ctdbd_control(struct ctdbd_connection *conn,
86 uint32_t vnn, uint32_t opcode,
87 uint64_t srvid, uint32_t flags,
88 TDB_DATA data,
89 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
90 int32_t *cstatus);
93 * exit on fatal communications errors with the ctdbd daemon
95 static void cluster_fatal(const char *why)
97 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
98 /* we don't use smb_panic() as we don't want to delay to write
99 a core file. We need to release this process id immediately
100 so that someone else can take over without getting sharing
101 violations */
102 _exit(1);
108 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
110 if (DEBUGLEVEL < 11) {
111 return;
113 DEBUGADD(11, ("len=%"PRIu32", magic=%"PRIu32", vers=%"PRIu32", "
114 "gen=%"PRIu32", op=%"PRIu32", reqid=%"PRIu32"\n",
115 hdr->length,
116 hdr->ctdb_magic,
117 hdr->ctdb_version,
118 hdr->generation,
119 hdr->operation,
120 hdr->reqid));
124 * Register a srvid with ctdbd
126 int register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
127 int (*cb)(struct tevent_context *ev,
128 uint32_t src_vnn, uint32_t dst_vnn,
129 uint64_t dst_srvid,
130 const uint8_t *msg, size_t msglen,
131 void *private_data),
132 void *private_data)
135 int ret;
136 int32_t cstatus;
137 size_t num_callbacks;
138 struct ctdbd_srvid_cb *tmp;
140 ret = ctdbd_control_local(conn, CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
141 tdb_null, NULL, NULL, &cstatus);
142 if (ret != 0) {
143 return ret;
146 num_callbacks = talloc_array_length(conn->callbacks);
148 tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
149 num_callbacks + 1);
150 if (tmp == NULL) {
151 return ENOMEM;
153 conn->callbacks = tmp;
155 conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
156 .srvid = srvid, .cb = cb, .private_data = private_data
159 return 0;
162 static int ctdbd_msg_call_back(struct tevent_context *ev,
163 struct ctdbd_connection *conn,
164 struct ctdb_req_message_old *msg)
166 uint32_t msg_len;
167 size_t i, num_callbacks;
169 msg_len = msg->hdr.length;
170 if (msg_len < offsetof(struct ctdb_req_message_old, data)) {
171 DBG_DEBUG("len %"PRIu32" too small\n", msg_len);
172 return 0;
174 msg_len -= offsetof(struct ctdb_req_message_old, data);
176 if (msg_len < msg->datalen) {
177 DBG_DEBUG("msg_len=%"PRIu32" < msg->datalen=%"PRIu32"\n",
178 msg_len, msg->datalen);
179 return 0;
182 num_callbacks = talloc_array_length(conn->callbacks);
184 for (i=0; i<num_callbacks; i++) {
185 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
187 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
188 int ret;
190 ret = cb->cb(ev,
191 msg->hdr.srcnode, msg->hdr.destnode,
192 msg->srvid, msg->data, msg->datalen,
193 cb->private_data);
194 if (ret != 0) {
195 return ret;
199 return 0;
203 * get our vnn from the cluster
205 static int get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
207 int32_t cstatus=-1;
208 int ret;
209 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_PNN, 0, 0,
210 tdb_null, NULL, NULL, &cstatus);
211 if (ret != 0) {
212 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
213 return ret;
215 *vnn = (uint32_t)cstatus;
216 return ret;
220 * Are we active (i.e. not banned or stopped?)
222 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
224 int32_t cstatus=-1;
225 TDB_DATA outdata = {0};
226 struct ctdb_node_map_old *m;
227 bool ok = false;
228 uint32_t i;
229 int ret;
231 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_NODEMAP, 0, 0,
232 tdb_null, talloc_tos(), &outdata, &cstatus);
233 if (ret != 0) {
234 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
235 return false;
237 if ((cstatus != 0) || (outdata.dptr == NULL)) {
238 DEBUG(2, ("Received invalid ctdb data\n"));
239 goto fail;
242 m = (struct ctdb_node_map_old *)outdata.dptr;
244 for (i=0; i<m->num; i++) {
245 if (vnn == m->nodes[i].pnn) {
246 break;
250 if (i == m->num) {
251 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
252 (int)vnn));
253 goto fail;
256 if ((m->nodes[i].flags & NODE_FLAGS_INACTIVE) != 0) {
257 DEBUG(2, ("Node has status %x, not active\n",
258 (int)m->nodes[i].flags));
259 goto fail;
262 ok = true;
263 fail:
264 TALLOC_FREE(outdata.dptr);
265 return ok;
268 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
270 return conn->our_vnn;
274 * Get us a ctdb connection
277 static int ctdbd_connect(const char *sockname, int *pfd)
279 struct samba_sockaddr addr = {
280 .sa_socklen = sizeof(struct sockaddr_un),
281 .u = {
282 .un = {
283 .sun_family = AF_UNIX,
287 int fd;
288 size_t namelen;
289 int ret;
291 fd = socket(AF_UNIX, SOCK_STREAM, 0);
292 if (fd == -1) {
293 int err = errno;
294 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
295 return err;
298 namelen = strlcpy(addr.u.un.sun_path,
299 sockname,
300 sizeof(addr.u.un.sun_path));
301 if (namelen >= sizeof(addr.u.un.sun_path)) {
302 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
303 sockname));
304 close(fd);
305 return ENAMETOOLONG;
308 ret = connect(fd, &addr.u.sa, addr.sa_socklen);
309 if (ret == -1) {
310 int err = errno;
311 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
312 strerror(err)));
313 close(fd);
314 return err;
317 *pfd = fd;
318 return 0;
321 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
322 struct ctdb_req_header **result)
324 struct ctdb_req_header *req;
325 uint32_t msglen;
326 ssize_t nread;
328 if (timeout != -1) {
329 struct pollfd pfd = { .fd = fd, .events = POLLIN };
330 int ret;
332 ret = sys_poll_intr(&pfd, 1, timeout);
333 if (ret == -1) {
334 return errno;
336 if (ret == 0) {
337 return ETIMEDOUT;
339 if (ret != 1) {
340 return EIO;
344 nread = read_data(fd, &msglen, sizeof(msglen));
345 if (nread == -1) {
346 return errno;
348 if (nread == 0) {
349 return EIO;
352 if (msglen < sizeof(struct ctdb_req_header)) {
353 return EIO;
356 req = talloc_size(mem_ctx, msglen);
357 if (req == NULL) {
358 return ENOMEM;
360 talloc_set_name_const(req, "struct ctdb_req_header");
362 req->length = msglen;
364 nread = read_data(fd, ((char *)req) + sizeof(msglen),
365 msglen - sizeof(msglen));
366 if (nread == -1) {
367 TALLOC_FREE(req);
368 return errno;
370 if (nread == 0) {
371 TALLOC_FREE(req);
372 return EIO;
375 *result = req;
376 return 0;
380 * Read a full ctdbd request. If we have a messaging context, defer incoming
381 * messages that might come in between.
384 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
385 TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
387 struct ctdb_req_header *hdr = NULL;
388 int ret;
390 next_pkt:
392 ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
393 if (ret != 0) {
394 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
395 cluster_fatal("failed to read data from ctdbd\n");
396 return -1;
398 SMB_ASSERT(hdr != NULL);
400 DEBUG(11, ("Received ctdb packet\n"));
401 ctdb_packet_dump(hdr);
403 if (hdr->operation == CTDB_REQ_MESSAGE) {
404 struct ctdb_req_message_old *msg = (struct ctdb_req_message_old *)hdr;
406 ret = ctdbd_msg_call_back(NULL, conn, msg);
407 if (ret != 0) {
408 TALLOC_FREE(hdr);
409 return ret;
412 TALLOC_FREE(hdr);
413 goto next_pkt;
416 if ((reqid != 0) && (hdr->reqid != reqid)) {
417 /* we got the wrong reply */
418 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
419 "been %u\n", hdr->reqid, reqid));
420 TALLOC_FREE(hdr);
421 goto next_pkt;
424 *result = talloc_move(mem_ctx, &hdr);
426 return 0;
429 static int ctdbd_connection_destructor(struct ctdbd_connection *c);
432 * Get us a ctdbd connection
435 static int ctdbd_init_connection_internal(TALLOC_CTX *mem_ctx,
436 const char *sockname, int timeout,
437 struct ctdbd_connection *conn)
439 int ret;
441 conn->timeout = timeout;
442 if (conn->timeout == 0) {
443 conn->timeout = -1;
446 ret = ctdbd_connect(sockname, &conn->fd);
447 if (ret != 0) {
448 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
449 return ret;
451 talloc_set_destructor(conn, ctdbd_connection_destructor);
453 ret = get_cluster_vnn(conn, &conn->our_vnn);
454 if (ret != 0) {
455 DEBUG(10, ("get_cluster_vnn failed: %s\n", strerror(ret)));
456 return ret;
459 if (!ctdbd_working(conn, conn->our_vnn)) {
460 DEBUG(2, ("Node is not working, can not connect\n"));
461 return EIO;
464 generate_random_buffer((unsigned char *)&conn->rand_srvid,
465 sizeof(conn->rand_srvid));
467 ret = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
468 if (ret != 0) {
469 DEBUG(5, ("Could not register random srvid: %s\n",
470 strerror(ret)));
471 return ret;
474 return 0;
477 int ctdbd_init_connection(TALLOC_CTX *mem_ctx,
478 const char *sockname, int timeout,
479 struct ctdbd_connection **pconn)
481 struct ctdbd_connection *conn;
482 int ret;
484 if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
485 DEBUG(0, ("talloc failed\n"));
486 return ENOMEM;
489 ret = ctdbd_init_connection_internal(mem_ctx,
490 sockname,
491 timeout,
492 conn);
493 if (ret != 0) {
494 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
495 strerror(ret));
496 goto fail;
499 *pconn = conn;
500 return 0;
502 fail:
503 TALLOC_FREE(conn);
504 return ret;
507 int ctdbd_reinit_connection(TALLOC_CTX *mem_ctx,
508 const char *sockname, int timeout,
509 struct ctdbd_connection *conn)
511 int ret;
513 ret = ctdbd_connection_destructor(conn);
514 if (ret != 0) {
515 DBG_ERR("ctdbd_connection_destructor failed\n");
516 return ret;
519 ret = ctdbd_init_connection_internal(mem_ctx,
520 sockname,
521 timeout,
522 conn);
523 if (ret != 0) {
524 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
525 strerror(ret));
526 return ret;
529 return 0;
532 int ctdbd_init_async_connection(
533 TALLOC_CTX *mem_ctx,
534 const char *sockname,
535 int timeout,
536 struct ctdbd_connection **pconn)
538 struct ctdbd_connection *conn = NULL;
539 int ret;
541 ret = ctdbd_init_connection(mem_ctx, sockname, timeout, &conn);
542 if (ret != 0) {
543 return ret;
546 ret = set_blocking(conn->fd, false);
547 if (ret == -1) {
548 int err = errno;
549 TALLOC_FREE(conn);
550 return err;
553 conn->outgoing = tevent_queue_create(conn, "ctdb async outgoing");
554 if (conn->outgoing == NULL) {
555 TALLOC_FREE(conn);
556 return ENOMEM;
559 *pconn = conn;
560 return 0;
563 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
565 return conn->fd;
569 * Packet handler to receive and handle a ctdb message
571 static int ctdb_handle_message(struct tevent_context *ev,
572 struct ctdbd_connection *conn,
573 struct ctdb_req_header *hdr)
575 struct ctdb_req_message_old *msg;
577 if (hdr->operation != CTDB_REQ_MESSAGE) {
578 DEBUG(0, ("Received async msg of type %u, discarding\n",
579 hdr->operation));
580 return EINVAL;
583 msg = (struct ctdb_req_message_old *)hdr;
585 ctdbd_msg_call_back(ev, conn, msg);
587 return 0;
590 void ctdbd_socket_readable(struct tevent_context *ev,
591 struct ctdbd_connection *conn)
593 struct ctdb_req_header *hdr = NULL;
594 int ret;
596 ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
597 if (ret != 0) {
598 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
599 cluster_fatal("failed to read data from ctdbd\n");
601 SMB_ASSERT(hdr != NULL);
603 ret = ctdb_handle_message(ev, conn, hdr);
605 TALLOC_FREE(hdr);
607 if (ret != 0) {
608 DEBUG(10, ("could not handle incoming message: %s\n",
609 strerror(ret)));
613 int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
614 uint32_t dst_vnn, uint64_t dst_srvid,
615 const struct iovec *iov, int iovlen)
617 struct ctdb_req_message_old r;
618 struct iovec iov2[iovlen+1];
619 size_t buflen = iov_buflen(iov, iovlen);
620 ssize_t nwritten;
622 r.hdr.length = offsetof(struct ctdb_req_message_old, data) + buflen;
623 r.hdr.ctdb_magic = CTDB_MAGIC;
624 r.hdr.ctdb_version = CTDB_PROTOCOL;
625 r.hdr.generation = 1;
626 r.hdr.operation = CTDB_REQ_MESSAGE;
627 r.hdr.destnode = dst_vnn;
628 r.hdr.srcnode = conn->our_vnn;
629 r.hdr.reqid = 0;
630 r.srvid = dst_srvid;
631 r.datalen = buflen;
633 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
634 ctdb_packet_dump(&r.hdr);
636 iov2[0].iov_base = &r;
637 iov2[0].iov_len = offsetof(struct ctdb_req_message_old, data);
638 memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
640 nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
641 if (nwritten == -1) {
642 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
643 cluster_fatal("cluster dispatch daemon msg write error\n");
646 return 0;
650 * send/recv a generic ctdb control message
652 static int ctdbd_control(struct ctdbd_connection *conn,
653 uint32_t vnn, uint32_t opcode,
654 uint64_t srvid, uint32_t flags,
655 TDB_DATA data,
656 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
657 int32_t *cstatus)
659 struct ctdb_req_control_old req;
660 struct ctdb_req_header *hdr;
661 struct ctdb_reply_control_old *reply = NULL;
662 struct iovec iov[2];
663 ssize_t nwritten;
664 int ret;
666 if (ctdbd_conn_has_async_reqs(conn)) {
668 * Can't use sync call while an async call is in flight. Adding
669 * this check as a safety net. We'll be using different
670 * connections for sync and async requests, so this shouldn't
671 * happen, but who knows...
673 DBG_ERR("Async ctdb req on sync connection\n");
674 return EINVAL;
677 ZERO_STRUCT(req);
678 req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
679 req.hdr.ctdb_magic = CTDB_MAGIC;
680 req.hdr.ctdb_version = CTDB_PROTOCOL;
681 req.hdr.operation = CTDB_REQ_CONTROL;
682 req.hdr.reqid = ctdbd_next_reqid(conn);
683 req.hdr.destnode = vnn;
684 req.opcode = opcode;
685 req.srvid = srvid;
686 req.datalen = data.dsize;
687 req.flags = flags;
689 DBG_DEBUG("Sending ctdb packet reqid=%"PRIu32", vnn=%"PRIu32", "
690 "opcode=%"PRIu32", srvid=%"PRIu64"\n", req.hdr.reqid,
691 req.hdr.destnode, req.opcode, req.srvid);
692 ctdb_packet_dump(&req.hdr);
694 iov[0].iov_base = &req;
695 iov[0].iov_len = offsetof(struct ctdb_req_control_old, data);
696 iov[1].iov_base = data.dptr;
697 iov[1].iov_len = data.dsize;
699 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
700 if (nwritten == -1) {
701 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
702 cluster_fatal("cluster dispatch daemon msg write error\n");
705 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
706 if (cstatus) {
707 *cstatus = 0;
709 return 0;
712 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
713 if (ret != 0) {
714 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
715 return ret;
718 if (hdr->operation != CTDB_REPLY_CONTROL) {
719 DEBUG(0, ("received invalid reply\n"));
720 TALLOC_FREE(hdr);
721 return EIO;
723 reply = (struct ctdb_reply_control_old *)hdr;
725 if (outdata) {
726 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
727 mem_ctx, reply->data, reply->datalen))) {
728 TALLOC_FREE(reply);
729 return ENOMEM;
731 outdata->dsize = reply->datalen;
733 if (cstatus) {
734 (*cstatus) = reply->status;
737 TALLOC_FREE(reply);
738 return ret;
742 * see if a remote process exists
744 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn,
745 pid_t pid, uint64_t unique_id)
747 uint8_t buf[sizeof(pid)+sizeof(unique_id)];
748 int32_t cstatus = 0;
749 int ret;
751 if (unique_id == SERVERID_UNIQUE_ID_NOT_TO_VERIFY) {
752 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS,
753 0, 0,
754 (TDB_DATA) { .dptr = (uint8_t *)&pid,
755 .dsize = sizeof(pid) },
756 NULL, NULL, &cstatus);
757 if (ret != 0) {
758 return false;
760 return (cstatus == 0);
763 memcpy(buf, &pid, sizeof(pid));
764 memcpy(buf+sizeof(pid), &unique_id, sizeof(unique_id));
766 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_CHECK_PID_SRVID, 0, 0,
767 (TDB_DATA) { .dptr = buf, .dsize = sizeof(buf) },
768 NULL, NULL, &cstatus);
769 if (ret != 0) {
770 return false;
772 return (cstatus == 0);
776 * Get a db path
778 char *ctdbd_dbpath(struct ctdbd_connection *conn,
779 TALLOC_CTX *mem_ctx, uint32_t db_id)
781 int ret;
782 TDB_DATA data;
783 TDB_DATA rdata = {0};
784 int32_t cstatus = 0;
786 data.dptr = (uint8_t*)&db_id;
787 data.dsize = sizeof(db_id);
789 ret = ctdbd_control_local(conn, CTDB_CONTROL_GETDBPATH, 0, 0, data,
790 mem_ctx, &rdata, &cstatus);
791 if ((ret != 0) || cstatus != 0) {
792 DEBUG(0, (__location__ " ctdb_control for getdbpath failed: %s\n",
793 strerror(ret)));
794 TALLOC_FREE(rdata.dptr);
797 return (char *)rdata.dptr;
801 * attach to a ctdb database
803 int ctdbd_db_attach(struct ctdbd_connection *conn,
804 const char *name, uint32_t *db_id, bool persistent)
806 int ret;
807 TDB_DATA data = {0};
808 int32_t cstatus;
810 data = string_term_tdb_data(name);
812 ret = ctdbd_control_local(conn,
813 persistent
814 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
815 : CTDB_CONTROL_DB_ATTACH,
816 0, 0, data, NULL, &data, &cstatus);
817 if (ret != 0) {
818 DEBUG(0, (__location__ " ctdb_control for db_attach "
819 "failed: %s\n", strerror(ret)));
820 return ret;
823 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
824 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
825 TALLOC_FREE(data.dptr);
826 return EIO;
829 *db_id = *(uint32_t *)data.dptr;
830 talloc_free(data.dptr);
832 return 0;
836 * force the migration of a record to this node
838 int ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key)
840 struct ctdb_req_call_old req;
841 struct ctdb_req_header *hdr = NULL;
842 struct iovec iov[2];
843 ssize_t nwritten;
844 int ret;
846 if (ctdbd_conn_has_async_reqs(conn)) {
848 * Can't use sync call while an async call is in flight. Adding
849 * this check as a safety net. We'll be using different
850 * connections for sync and async requests, so this shouldn't
851 * happen, but who knows...
853 DBG_ERR("Async ctdb req on sync connection\n");
854 return EINVAL;
857 ZERO_STRUCT(req);
859 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
860 req.hdr.ctdb_magic = CTDB_MAGIC;
861 req.hdr.ctdb_version = CTDB_PROTOCOL;
862 req.hdr.operation = CTDB_REQ_CALL;
863 req.hdr.reqid = ctdbd_next_reqid(conn);
864 req.flags = CTDB_IMMEDIATE_MIGRATION;
865 req.callid = CTDB_NULL_FUNC;
866 req.db_id = db_id;
867 req.keylen = key.dsize;
869 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
870 ctdb_packet_dump(&req.hdr);
872 iov[0].iov_base = &req;
873 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
874 iov[1].iov_base = key.dptr;
875 iov[1].iov_len = key.dsize;
877 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
878 if (nwritten == -1) {
879 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
880 cluster_fatal("cluster dispatch daemon msg write error\n");
883 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
884 if (ret != 0) {
885 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
886 goto fail;
889 if (hdr->operation != CTDB_REPLY_CALL) {
890 if (hdr->operation == CTDB_REPLY_ERROR) {
891 DBG_ERR("received error from ctdb\n");
892 } else {
893 DBG_ERR("received invalid reply\n");
895 ret = EIO;
896 goto fail;
899 fail:
901 TALLOC_FREE(hdr);
902 return ret;
906 * Fetch a record and parse it
908 int ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
909 TDB_DATA key, bool local_copy,
910 void (*parser)(TDB_DATA key, TDB_DATA data,
911 void *private_data),
912 void *private_data)
914 struct ctdb_req_call_old req;
915 struct ctdb_req_header *hdr = NULL;
916 struct ctdb_reply_call_old *reply;
917 struct iovec iov[2];
918 ssize_t nwritten;
919 uint32_t flags;
920 int ret;
922 if (ctdbd_conn_has_async_reqs(conn)) {
924 * Can't use sync call while an async call is in flight. Adding
925 * this check as a safety net. We'll be using different
926 * connections for sync and async requests, so this shouldn't
927 * happen, but who knows...
929 DBG_ERR("Async ctdb req on sync connection\n");
930 return EINVAL;
933 flags = local_copy ? CTDB_WANT_READONLY : 0;
935 ZERO_STRUCT(req);
937 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
938 req.hdr.ctdb_magic = CTDB_MAGIC;
939 req.hdr.ctdb_version = CTDB_PROTOCOL;
940 req.hdr.operation = CTDB_REQ_CALL;
941 req.hdr.reqid = ctdbd_next_reqid(conn);
942 req.flags = flags;
943 req.callid = CTDB_FETCH_FUNC;
944 req.db_id = db_id;
945 req.keylen = key.dsize;
947 iov[0].iov_base = &req;
948 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
949 iov[1].iov_base = key.dptr;
950 iov[1].iov_len = key.dsize;
952 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
953 if (nwritten == -1) {
954 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
955 cluster_fatal("cluster dispatch daemon msg write error\n");
958 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
959 if (ret != 0) {
960 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
961 goto fail;
964 if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
965 DEBUG(0, ("received invalid reply\n"));
966 ret = EIO;
967 goto fail;
969 reply = (struct ctdb_reply_call_old *)hdr;
971 if (reply->datalen == 0) {
973 * Treat an empty record as non-existing
975 ret = ENOENT;
976 goto fail;
979 parser(key, make_tdb_data(&reply->data[0], reply->datalen),
980 private_data);
982 ret = 0;
983 fail:
984 TALLOC_FREE(hdr);
985 return ret;
989 Traverse a ctdb database. "conn" must be an otherwise unused
990 ctdb_connection where no other messages but the traverse ones are
991 expected.
994 int ctdbd_traverse(struct ctdbd_connection *conn, uint32_t db_id,
995 void (*fn)(TDB_DATA key, TDB_DATA data,
996 void *private_data),
997 void *private_data)
999 int ret;
1000 TDB_DATA key, data;
1001 struct ctdb_traverse_start t;
1002 int32_t cstatus = 0;
1004 if (ctdbd_conn_has_async_reqs(conn)) {
1006 * Can't use sync call while an async call is in flight. Adding
1007 * this check as a safety net. We'll be using different
1008 * connections for sync and async requests, so this shouldn't
1009 * happen, but who knows...
1011 DBG_ERR("Async ctdb req on sync connection\n");
1012 return EINVAL;
1015 t.db_id = db_id;
1016 t.srvid = conn->rand_srvid;
1017 t.reqid = ctdbd_next_reqid(conn);
1019 data.dptr = (uint8_t *)&t;
1020 data.dsize = sizeof(t);
1022 ret = ctdbd_control_local(conn, CTDB_CONTROL_TRAVERSE_START,
1023 conn->rand_srvid,
1024 0, data, NULL, NULL, &cstatus);
1026 if ((ret != 0) || (cstatus != 0)) {
1027 DEBUG(0,("ctdbd_control failed: %s, %d\n", strerror(ret),
1028 cstatus));
1030 if (ret == 0) {
1032 * We need a mapping here
1034 ret = EIO;
1036 return ret;
1039 while (true) {
1040 struct ctdb_req_header *hdr = NULL;
1041 struct ctdb_req_message_old *m;
1042 struct ctdb_rec_data_old *d;
1044 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1045 if (ret != 0) {
1046 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
1047 cluster_fatal("failed to read data from ctdbd\n");
1049 SMB_ASSERT(hdr != NULL);
1051 if (hdr->operation != CTDB_REQ_MESSAGE) {
1052 DEBUG(0, ("Got operation %u, expected a message\n",
1053 (unsigned)hdr->operation));
1054 return EIO;
1057 m = (struct ctdb_req_message_old *)hdr;
1058 d = (struct ctdb_rec_data_old *)&m->data[0];
1059 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1060 DEBUG(0, ("Got invalid traverse data of length %d\n",
1061 (int)m->datalen));
1062 return EIO;
1065 key.dsize = d->keylen;
1066 key.dptr = &d->data[0];
1067 data.dsize = d->datalen;
1068 data.dptr = &d->data[d->keylen];
1070 if (key.dsize == 0 && data.dsize == 0) {
1071 /* end of traverse */
1072 return 0;
1075 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1076 DEBUG(0, ("Got invalid ltdb header length %d\n",
1077 (int)data.dsize));
1078 return EIO;
1080 data.dsize -= sizeof(struct ctdb_ltdb_header);
1081 data.dptr += sizeof(struct ctdb_ltdb_header);
1083 if (fn != NULL) {
1084 fn(key, data, private_data);
1087 return 0;
1091 This is used to canonicalize a ctdb_sock_addr structure.
1093 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1094 struct sockaddr_storage *out)
1096 memcpy(out, in, sizeof (*out));
1098 #ifdef HAVE_IPV6
1099 if (in->ss_family == AF_INET6) {
1100 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1101 const struct sockaddr_in6 *in6 =
1102 (const struct sockaddr_in6 *)in;
1103 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1104 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1105 memset(out, 0, sizeof(*out));
1106 #ifdef HAVE_SOCK_SIN_LEN
1107 out4->sin_len = sizeof(*out);
1108 #endif
1109 out4->sin_family = AF_INET;
1110 out4->sin_port = in6->sin6_port;
1111 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1114 #endif
1118 * Register us as a server for a particular tcp connection
1121 int ctdbd_register_ips(struct ctdbd_connection *conn,
1122 const struct sockaddr_storage *_server,
1123 const struct sockaddr_storage *_client,
1124 int (*cb)(struct tevent_context *ev,
1125 uint32_t src_vnn, uint32_t dst_vnn,
1126 uint64_t dst_srvid,
1127 const uint8_t *msg, size_t msglen,
1128 void *private_data),
1129 void *private_data)
1131 struct ctdb_connection p;
1132 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1133 int ret;
1134 struct sockaddr_storage client;
1135 struct sockaddr_storage server;
1138 * Only one connection so far
1141 smbd_ctdb_canonicalize_ip(_client, &client);
1142 smbd_ctdb_canonicalize_ip(_server, &server);
1144 switch (client.ss_family) {
1145 case AF_INET:
1146 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1147 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1148 break;
1149 case AF_INET6:
1150 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1151 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1152 break;
1153 default:
1154 return EIO;
1158 * We want to be told about IP releases
1161 ret = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1162 cb, private_data);
1163 if (ret != 0) {
1164 return ret;
1168 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1169 * can send an extra ack to trigger a reset for our client, so it
1170 * immediately reconnects
1172 ret = ctdbd_control_local(conn,
1173 CTDB_CONTROL_TCP_CLIENT, 0,
1174 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1175 NULL);
1176 if (ret != 0) {
1177 return ret;
1179 return 0;
1182 static int ctdbd_control_get_public_ips(struct ctdbd_connection *conn,
1183 uint32_t flags,
1184 TALLOC_CTX *mem_ctx,
1185 struct ctdb_public_ip_list_old **_ips)
1187 struct ctdb_public_ip_list_old *ips = NULL;
1188 TDB_DATA outdata;
1189 int32_t cstatus = -1;
1190 size_t min_dsize;
1191 size_t max_ips;
1192 int ret;
1194 *_ips = NULL;
1196 ret = ctdbd_control_local(conn,
1197 CTDB_CONTROL_GET_PUBLIC_IPS,
1198 0, /* srvid */
1199 flags,
1200 tdb_null, /* indata */
1201 mem_ctx,
1202 &outdata,
1203 &cstatus);
1204 if (ret != 0 || cstatus != 0) {
1205 DBG_ERR("ctdb_control for getpublicips failed ret:%d cstatus:%d\n",
1206 ret, (int)cstatus);
1207 return -1;
1210 min_dsize = offsetof(struct ctdb_public_ip_list_old, ips);
1211 if (outdata.dsize < min_dsize) {
1212 DBG_ERR("outdata.dsize=%zu < min_dsize=%zu\n",
1213 outdata.dsize, min_dsize);
1214 return -1;
1216 max_ips = (outdata.dsize - min_dsize)/sizeof(struct ctdb_public_ip);
1217 ips = (struct ctdb_public_ip_list_old *)outdata.dptr;
1218 if ((size_t)ips->num > max_ips) {
1219 DBG_ERR("ips->num=%zu > max_ips=%zu\n",
1220 (size_t)ips->num, max_ips);
1221 return -1;
1224 *_ips = ips;
1225 return 0;
1228 int ctdbd_public_ip_foreach(struct ctdbd_connection *conn,
1229 int (*cb)(uint32_t total_ip_count,
1230 const struct sockaddr_storage *ip,
1231 bool is_movable_ip,
1232 void *private_data),
1233 void *private_data)
1235 uint32_t i;
1236 struct ctdb_public_ip_list_old *ips = NULL;
1237 int ret = ENOMEM;
1238 TALLOC_CTX *frame = talloc_stackframe();
1240 ret = ctdbd_control_get_public_ips(conn, 0, frame, &ips);
1241 if (ret < 0) {
1242 ret = EIO;
1243 goto out_free;
1246 for (i=0; i < ips->num; i++) {
1247 struct samba_sockaddr tmp = {
1248 .u = {
1249 .sa = ips->ips[i].addr.sa,
1253 ret = cb(ips->num,
1254 &tmp.u.ss,
1255 true, /* all ctdb public ips are movable */
1256 private_data);
1257 if (ret != 0) {
1258 goto out_free;
1262 ret = 0;
1263 out_free:
1264 TALLOC_FREE(frame);
1265 return ret;
1269 call a control on the local node
1271 int ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1272 uint64_t srvid, uint32_t flags, TDB_DATA data,
1273 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1274 int32_t *cstatus)
1276 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data,
1277 mem_ctx, outdata, cstatus);
1280 int ctdb_watch_us(struct ctdbd_connection *conn)
1282 struct ctdb_notify_data_old reg_data;
1283 size_t struct_len;
1284 int ret;
1285 int32_t cstatus;
1287 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1288 reg_data.len = 1;
1289 reg_data.notify_data[0] = 0;
1291 struct_len = offsetof(struct ctdb_notify_data_old,
1292 notify_data) + reg_data.len;
1294 ret = ctdbd_control_local(
1295 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1296 make_tdb_data((uint8_t *)&reg_data, struct_len),
1297 NULL, NULL, &cstatus);
1298 if (ret != 0) {
1299 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1300 strerror(ret)));
1302 return ret;
1305 int ctdb_unwatch(struct ctdbd_connection *conn)
1307 uint64_t srvid = CTDB_SRVID_SAMBA_NOTIFY;
1308 int ret;
1309 int32_t cstatus;
1311 ret = ctdbd_control_local(
1312 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1313 make_tdb_data((uint8_t *)&srvid, sizeof(srvid)),
1314 NULL, NULL, &cstatus);
1315 if (ret != 0) {
1316 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1317 strerror(ret)));
1319 return ret;
1322 int ctdbd_probe(const char *sockname, int timeout)
1325 * Do a very early check if ctdbd is around to avoid an abort and core
1326 * later
1328 struct ctdbd_connection *conn = NULL;
1329 int ret;
1331 ret = ctdbd_init_connection(talloc_tos(), sockname, timeout,
1332 &conn);
1335 * We only care if we can connect.
1337 TALLOC_FREE(conn);
1339 return ret;
1342 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
1344 if (c->fd != -1) {
1345 close(c->fd);
1346 c->fd = -1;
1348 return 0;
1351 void ctdbd_prep_hdr_next_reqid(
1352 struct ctdbd_connection *conn, struct ctdb_req_header *hdr)
1354 *hdr = (struct ctdb_req_header) {
1355 .ctdb_magic = CTDB_MAGIC,
1356 .ctdb_version = CTDB_PROTOCOL,
1357 .reqid = ctdbd_next_reqid(conn),
1358 .destnode = CTDB_CURRENT_NODE,
1362 struct ctdbd_pkt_read_state {
1363 uint8_t *pkt;
1366 static ssize_t ctdbd_pkt_read_more(
1367 uint8_t *buf, size_t buflen, void *private_data);
1368 static void ctdbd_pkt_read_done(struct tevent_req *subreq);
1370 static struct tevent_req *ctdbd_pkt_read_send(
1371 TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd)
1373 struct tevent_req *req = NULL, *subreq = NULL;
1374 struct ctdbd_pkt_read_state *state = NULL;
1376 req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state);
1377 if (req == NULL) {
1378 return NULL;
1380 subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL);
1381 if (tevent_req_nomem(subreq, req)) {
1382 return tevent_req_post(req, ev);
1384 tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req);
1385 return req;
1388 static ssize_t ctdbd_pkt_read_more(
1389 uint8_t *buf, size_t buflen, void *private_data)
1391 uint32_t msglen;
1392 if (buflen < 4) {
1393 return -1;
1395 if (buflen > 4) {
1396 return 0; /* Been here, done */
1398 memcpy(&msglen, buf, 4);
1400 if (msglen < sizeof(struct ctdb_req_header)) {
1401 return -1;
1403 return msglen - sizeof(msglen);
1406 static void ctdbd_pkt_read_done(struct tevent_req *subreq)
1408 struct tevent_req *req = tevent_req_callback_data(
1409 subreq, struct tevent_req);
1410 struct ctdbd_pkt_read_state *state = tevent_req_data(
1411 req, struct ctdbd_pkt_read_state);
1412 ssize_t nread;
1413 int err;
1415 nread = read_packet_recv(subreq, state, &state->pkt, &err);
1416 TALLOC_FREE(subreq);
1417 if (nread == -1) {
1418 tevent_req_error(req, err);
1419 return;
1421 tevent_req_done(req);
1424 static int ctdbd_pkt_read_recv(
1425 struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt)
1427 struct ctdbd_pkt_read_state *state = tevent_req_data(
1428 req, struct ctdbd_pkt_read_state);
1429 int err;
1431 if (tevent_req_is_unix_error(req, &err)) {
1432 return err;
1434 *pkt = talloc_move(mem_ctx, &state->pkt);
1435 tevent_req_received(req);
1436 return 0;
1439 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn);
1440 static void ctdbd_conn_received(struct tevent_req *subreq);
1442 struct ctdbd_req_state {
1443 struct ctdbd_connection *conn;
1444 struct tevent_context *ev;
1445 uint32_t reqid;
1446 struct ctdb_req_header *reply;
1449 static void ctdbd_req_unset_pending(struct tevent_req *req)
1451 struct ctdbd_req_state *state = tevent_req_data(
1452 req, struct ctdbd_req_state);
1453 struct ctdbd_connection *conn = state->conn;
1454 size_t num_pending = talloc_array_length(conn->pending);
1455 size_t i, num_after;
1457 tevent_req_set_cleanup_fn(req, NULL);
1459 if (num_pending == 1) {
1461 * conn->read_req is a child of conn->pending
1463 TALLOC_FREE(conn->pending);
1464 conn->read_req = NULL;
1465 return;
1468 for (i=0; i<num_pending; i++) {
1469 if (req == conn->pending[i]) {
1470 break;
1473 if (i == num_pending) {
1475 * Something's seriously broken. Just returning here is the
1476 * right thing nevertheless, the point of this routine is to
1477 * remove ourselves from conn->pending.
1479 return;
1482 num_after = num_pending - i - 1;
1483 if (num_after > 0) {
1484 memmove(&conn->pending[i],
1485 &conn->pending[i] + 1,
1486 sizeof(*conn->pending) * num_after);
1488 conn->pending = talloc_realloc(
1489 NULL, conn->pending, struct tevent_req *, num_pending - 1);
1492 static void ctdbd_req_cleanup(
1493 struct tevent_req *req, enum tevent_req_state req_state)
1495 ctdbd_req_unset_pending(req);
1498 static bool ctdbd_req_set_pending(struct tevent_req *req)
1500 struct ctdbd_req_state *state = tevent_req_data(
1501 req, struct ctdbd_req_state);
1502 struct ctdbd_connection *conn = state->conn;
1503 struct tevent_req **pending = NULL;
1504 size_t num_pending = talloc_array_length(conn->pending);
1505 bool ok;
1507 pending = talloc_realloc(
1508 conn, conn->pending, struct tevent_req *, num_pending + 1);
1509 if (pending == NULL) {
1510 return false;
1512 pending[num_pending] = req;
1513 conn->pending = pending;
1515 tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup);
1517 ok = ctdbd_conn_receive_next(conn);
1518 if (!ok) {
1519 ctdbd_req_unset_pending(req);
1520 return false;
1523 return true;
1526 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn)
1528 size_t num_pending = talloc_array_length(conn->pending);
1529 struct tevent_req *req = NULL;
1530 struct ctdbd_req_state *state = NULL;
1532 if (conn->read_req != NULL) {
1533 return true;
1535 if (num_pending == 0) {
1537 * done for now
1539 return true;
1542 req = conn->pending[0];
1543 state = tevent_req_data(req, struct ctdbd_req_state);
1545 conn->read_req = ctdbd_pkt_read_send(
1546 conn->pending, state->ev, conn->fd);
1547 if (conn->read_req == NULL) {
1548 return false;
1550 tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn);
1551 return true;
1554 static void ctdbd_conn_received(struct tevent_req *subreq)
1556 struct ctdbd_connection *conn = tevent_req_callback_data(
1557 subreq, struct ctdbd_connection);
1558 TALLOC_CTX *frame = talloc_stackframe();
1559 uint8_t *pkt = NULL;
1560 int ret;
1561 struct ctdb_req_header *hdr = NULL;
1562 uint32_t reqid;
1563 struct tevent_req *req = NULL;
1564 struct ctdbd_req_state *state = NULL;
1565 size_t i, num_pending;
1566 bool ok;
1568 SMB_ASSERT(subreq == conn->read_req);
1569 conn->read_req = NULL;
1571 ret = ctdbd_pkt_read_recv(subreq, frame, &pkt);
1572 TALLOC_FREE(subreq);
1573 if (ret != 0) {
1574 cluster_fatal("ctdbd_pkt_read failed\n");
1577 hdr = (struct ctdb_req_header *)pkt;
1578 reqid = hdr->reqid;
1579 num_pending = talloc_array_length(conn->pending);
1581 for (i=0; i<num_pending; i++) {
1582 req = conn->pending[i];
1583 state = tevent_req_data(req, struct ctdbd_req_state);
1584 if (state->reqid == reqid) {
1585 break;
1589 if (i == num_pending) {
1590 /* not found */
1591 TALLOC_FREE(frame);
1592 return;
1595 state->reply = talloc_move(state, &hdr);
1596 tevent_req_defer_callback(req, state->ev);
1597 tevent_req_done(req);
1599 TALLOC_FREE(frame);
1601 ok = ctdbd_conn_receive_next(conn);
1602 if (!ok) {
1603 cluster_fatal("ctdbd_conn_receive_next failed\n");
1607 static void ctdbd_req_written(struct tevent_req *subreq);
1609 struct tevent_req *ctdbd_req_send(
1610 TALLOC_CTX *mem_ctx,
1611 struct tevent_context *ev,
1612 struct ctdbd_connection *conn,
1613 struct iovec *iov,
1614 size_t num_iov)
1616 struct tevent_req *req = NULL, *subreq = NULL;
1617 struct ctdbd_req_state *state = NULL;
1618 struct ctdb_req_header *hdr = NULL;
1619 bool ok;
1621 req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state);
1622 if (req == NULL) {
1623 return NULL;
1625 state->conn = conn;
1626 state->ev = ev;
1628 if ((num_iov == 0) ||
1629 (iov[0].iov_len < sizeof(struct ctdb_req_header))) {
1630 tevent_req_error(req, EINVAL);
1631 return tevent_req_post(req, ev);
1633 hdr = iov[0].iov_base;
1634 state->reqid = hdr->reqid;
1636 ok = ctdbd_req_set_pending(req);
1637 if (!ok) {
1638 tevent_req_oom(req);
1639 return tevent_req_post(req, ev);
1642 subreq = writev_send(
1643 state, ev, conn->outgoing, conn->fd, false, iov, num_iov);
1644 if (tevent_req_nomem(subreq, req)) {
1645 return tevent_req_post(req, ev);
1647 tevent_req_set_callback(subreq, ctdbd_req_written, req);
1649 return req;
1652 static void ctdbd_req_written(struct tevent_req *subreq)
1654 struct tevent_req *req = tevent_req_callback_data(
1655 subreq, struct tevent_req);
1656 ssize_t nwritten;
1657 int err;
1659 nwritten = writev_recv(subreq, &err);
1660 TALLOC_FREE(subreq);
1661 if (nwritten == -1) {
1662 tevent_req_error(req, err);
1663 return;
1667 int ctdbd_req_recv(
1668 struct tevent_req *req,
1669 TALLOC_CTX *mem_ctx,
1670 struct ctdb_req_header **reply)
1672 struct ctdbd_req_state *state = tevent_req_data(
1673 req, struct ctdbd_req_state);
1674 int err;
1676 if (tevent_req_is_unix_error(req, &err)) {
1677 return err;
1679 *reply = talloc_move(mem_ctx, &state->reply);
1680 tevent_req_received(req);
1681 return 0;
1684 struct ctdbd_parse_state {
1685 struct tevent_context *ev;
1686 struct ctdbd_connection *conn;
1687 uint32_t reqid;
1688 TDB_DATA key;
1689 uint8_t _keybuf[64];
1690 struct ctdb_req_call_old ctdb_req;
1691 struct iovec iov[2];
1692 void (*parser)(TDB_DATA key,
1693 TDB_DATA data,
1694 void *private_data);
1695 void *private_data;
1698 static void ctdbd_parse_done(struct tevent_req *subreq);
1700 struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
1701 struct tevent_context *ev,
1702 struct ctdbd_connection *conn,
1703 uint32_t db_id,
1704 TDB_DATA key,
1705 bool local_copy,
1706 void (*parser)(TDB_DATA key,
1707 TDB_DATA data,
1708 void *private_data),
1709 void *private_data,
1710 enum dbwrap_req_state *req_state)
1712 struct tevent_req *req = NULL;
1713 struct ctdbd_parse_state *state = NULL;
1714 uint32_t flags;
1715 uint32_t packet_length;
1716 struct tevent_req *subreq = NULL;
1718 req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
1719 if (req == NULL) {
1720 *req_state = DBWRAP_REQ_ERROR;
1721 return NULL;
1724 *req_state = DBWRAP_REQ_DISPATCHED;
1726 *state = (struct ctdbd_parse_state) {
1727 .ev = ev,
1728 .conn = conn,
1729 .reqid = ctdbd_next_reqid(conn),
1730 .parser = parser,
1731 .private_data = private_data,
1734 flags = local_copy ? CTDB_WANT_READONLY : 0;
1735 packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
1738 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
1739 * all passed iov elements have a lifetime longer that the tevent_req
1740 * returned by ctdb_pkt_send_send(). This is required continue sending a
1741 * the low level request into the ctdb socket, if a higher level
1742 * ('this') request is canceled (or talloc free'd) by the application
1743 * layer, without sending invalid packets to ctdb.
1745 if (key.dsize > sizeof(state->_keybuf)) {
1746 state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
1747 if (tevent_req_nomem(state->key.dptr, req)) {
1748 return tevent_req_post(req, ev);
1750 } else {
1751 memcpy(state->_keybuf, key.dptr, key.dsize);
1752 state->key.dptr = state->_keybuf;
1754 state->key.dsize = key.dsize;
1756 state->ctdb_req.hdr.length = packet_length;
1757 state->ctdb_req.hdr.ctdb_magic = CTDB_MAGIC;
1758 state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
1759 state->ctdb_req.hdr.operation = CTDB_REQ_CALL;
1760 state->ctdb_req.hdr.reqid = state->reqid;
1761 state->ctdb_req.flags = flags;
1762 state->ctdb_req.callid = CTDB_FETCH_FUNC;
1763 state->ctdb_req.db_id = db_id;
1764 state->ctdb_req.keylen = state->key.dsize;
1766 state->iov[0].iov_base = &state->ctdb_req;
1767 state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
1768 state->iov[1].iov_base = state->key.dptr;
1769 state->iov[1].iov_len = state->key.dsize;
1771 subreq = ctdbd_req_send(
1772 state, ev, conn, state->iov, ARRAY_SIZE(state->iov));
1773 if (tevent_req_nomem(subreq, req)) {
1774 *req_state = DBWRAP_REQ_ERROR;
1775 return tevent_req_post(req, ev);
1777 tevent_req_set_callback(subreq, ctdbd_parse_done, req);
1779 return req;
1782 static void ctdbd_parse_done(struct tevent_req *subreq)
1784 struct tevent_req *req = tevent_req_callback_data(
1785 subreq, struct tevent_req);
1786 struct ctdbd_parse_state *state = tevent_req_data(
1787 req, struct ctdbd_parse_state);
1788 struct ctdb_req_header *hdr = NULL;
1789 struct ctdb_reply_call_old *reply = NULL;
1790 int ret;
1792 ret = ctdbd_req_recv(subreq, state, &hdr);
1793 TALLOC_FREE(subreq);
1794 if (tevent_req_error(req, ret)) {
1795 DBG_DEBUG("ctdb_req_recv failed %s\n", strerror(ret));
1796 return;
1798 SMB_ASSERT(hdr != NULL);
1800 if (hdr->operation != CTDB_REPLY_CALL) {
1801 DBG_ERR("received invalid reply\n");
1802 ctdb_packet_dump(hdr);
1803 tevent_req_error(req, EIO);
1804 return;
1807 reply = (struct ctdb_reply_call_old *)hdr;
1809 if (reply->datalen == 0) {
1811 * Treat an empty record as non-existing
1813 tevent_req_error(req, ENOENT);
1814 return;
1817 state->parser(state->key,
1818 make_tdb_data(&reply->data[0], reply->datalen),
1819 state->private_data);
1821 tevent_req_done(req);
1822 return;
1825 int ctdbd_parse_recv(struct tevent_req *req)
1827 return tevent_req_simple_recv_unix(req);