s3:ctdbd_conn: fix ctdbd_public_ip_foreach() for ipv6 addresses
[Samba.git] / source3 / lib / ctdbd_conn.c
blob3698c9d3672473e9f2719f0e4c9a3b7b1130d67f
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "replace.h"
22 #include <tevent.h>
23 #include "util_tdb.h"
24 #include "serverid.h"
25 #include "ctdbd_conn.h"
26 #include "system/select.h"
27 #include "lib/util/util_net.h"
28 #include "lib/util/sys_rw_data.h"
29 #include "lib/util/iov_buf.h"
30 #include "lib/util/select.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/talloc_stack.h"
33 #include "lib/util/genrand.h"
34 #include "lib/util/fault.h"
35 #include "lib/util/dlinklist.h"
36 #include "lib/util/tevent_unix.h"
37 #include "lib/util/sys_rw.h"
38 #include "lib/util/blocking.h"
39 #include "ctdb/include/ctdb_protocol.h"
40 #include "lib/async_req/async_sock.h"
42 /* paths to these include files come from --with-ctdb= in configure */
44 struct ctdbd_srvid_cb {
45 uint64_t srvid;
46 int (*cb)(struct tevent_context *ev,
47 uint32_t src_vnn, uint32_t dst_vnn,
48 uint64_t dst_srvid,
49 const uint8_t *msg, size_t msglen,
50 void *private_data);
51 void *private_data;
54 struct ctdbd_connection {
55 uint32_t reqid;
56 uint32_t our_vnn;
57 uint64_t rand_srvid;
58 struct ctdbd_srvid_cb *callbacks;
59 int fd;
60 int timeout;
63 * Outgoing queue for writev_send of asynchronous ctdb requests
65 struct tevent_queue *outgoing;
66 struct tevent_req **pending;
67 struct tevent_req *read_req;
70 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
72 size_t len = talloc_array_length(conn->pending);
73 return (len != 0);
76 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
78 conn->reqid += 1;
79 if (conn->reqid == 0) {
80 conn->reqid += 1;
82 return conn->reqid;
85 static int ctdbd_control(struct ctdbd_connection *conn,
86 uint32_t vnn, uint32_t opcode,
87 uint64_t srvid, uint32_t flags,
88 TDB_DATA data,
89 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
90 int32_t *cstatus);
93 * exit on fatal communications errors with the ctdbd daemon
95 static void cluster_fatal(const char *why)
97 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
98 /* we don't use smb_panic() as we don't want to delay to write
99 a core file. We need to release this process id immediately
100 so that someone else can take over without getting sharing
101 violations */
102 _exit(1);
108 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
110 if (DEBUGLEVEL < 11) {
111 return;
113 DEBUGADD(11, ("len=%"PRIu32", magic=%"PRIu32", vers=%"PRIu32", "
114 "gen=%"PRIu32", op=%"PRIu32", reqid=%"PRIu32"\n",
115 hdr->length,
116 hdr->ctdb_magic,
117 hdr->ctdb_version,
118 hdr->generation,
119 hdr->operation,
120 hdr->reqid));
124 * Register a srvid with ctdbd
126 int register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
127 int (*cb)(struct tevent_context *ev,
128 uint32_t src_vnn, uint32_t dst_vnn,
129 uint64_t dst_srvid,
130 const uint8_t *msg, size_t msglen,
131 void *private_data),
132 void *private_data)
134 size_t num_callbacks = talloc_array_length(conn->callbacks);
135 struct ctdbd_srvid_cb *tmp;
136 bool need_register = true;
137 size_t i;
139 for (i = 0; i < num_callbacks; i++) {
140 struct ctdbd_srvid_cb *c = &conn->callbacks[i];
142 if (c->srvid == srvid) {
143 need_register = false;
144 break;
148 if (need_register) {
149 int ret;
150 int32_t cstatus;
152 ret = ctdbd_control_local(conn, CTDB_CONTROL_REGISTER_SRVID,
153 srvid, 0, tdb_null, NULL, NULL,
154 &cstatus);
155 if (ret != 0) {
156 return ret;
161 tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
162 num_callbacks + 1);
163 if (tmp == NULL) {
164 return ENOMEM;
166 conn->callbacks = tmp;
168 conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
169 .srvid = srvid, .cb = cb, .private_data = private_data
172 return 0;
175 void deregister_from_ctdbd(struct ctdbd_connection *conn,
176 uint64_t srvid,
177 int (*cb)(struct tevent_context *ev,
178 uint32_t src_vnn,
179 uint32_t dst_vnn,
180 uint64_t dst_srvid,
181 const uint8_t *msg,
182 size_t msglen,
183 void *private_data),
184 void *private_data)
186 struct ctdbd_srvid_cb *cbs = conn->callbacks;
187 size_t i, num_callbacks = talloc_array_length(cbs);
188 bool need_deregister = false;
189 bool keep_registration = false;
191 if (num_callbacks == 0) {
192 return;
195 for (i = 0; i < num_callbacks;) {
196 struct ctdbd_srvid_cb *c = &cbs[i];
198 if (c->srvid != srvid) {
199 i++;
200 continue;
203 if ((c->cb == cb) && (c->private_data == private_data)) {
204 need_deregister = true;
205 ARRAY_DEL_ELEMENT(cbs, i, num_callbacks);
206 num_callbacks--;
207 continue;
210 keep_registration = true;
211 i++;
214 conn->callbacks = talloc_realloc(conn,
215 cbs,
216 struct ctdbd_srvid_cb,
217 num_callbacks);
219 if (keep_registration) {
220 need_deregister = false;
223 if (need_deregister) {
224 int ret;
225 int32_t cstatus;
227 ret = ctdbd_control_local(conn, CTDB_CONTROL_DEREGISTER_SRVID,
228 srvid, 0, tdb_null, NULL, NULL,
229 &cstatus);
230 if (ret != 0) {
232 * If CTDB_CONTROL_DEREGISTER_SRVID fails we may still
233 * get messages later, but we don't have a callback
234 * anymore, we just ignore these.
239 return;
242 static int ctdbd_msg_call_back(struct tevent_context *ev,
243 struct ctdbd_connection *conn,
244 struct ctdb_req_message_old *msg)
246 uint32_t msg_len;
247 size_t i, num_callbacks;
249 msg_len = msg->hdr.length;
250 if (msg_len < offsetof(struct ctdb_req_message_old, data)) {
251 DBG_DEBUG("len %"PRIu32" too small\n", msg_len);
252 return 0;
254 msg_len -= offsetof(struct ctdb_req_message_old, data);
256 if (msg_len < msg->datalen) {
257 DBG_DEBUG("msg_len=%"PRIu32" < msg->datalen=%"PRIu32"\n",
258 msg_len, msg->datalen);
259 return 0;
262 num_callbacks = talloc_array_length(conn->callbacks);
264 for (i=0; i<num_callbacks; i++) {
265 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
267 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
268 int ret;
270 ret = cb->cb(ev,
271 msg->hdr.srcnode, msg->hdr.destnode,
272 msg->srvid, msg->data, msg->datalen,
273 cb->private_data);
274 if (ret != 0) {
275 return ret;
279 return 0;
283 * get our vnn from the cluster
285 static int get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
287 int32_t cstatus=-1;
288 int ret;
289 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_PNN, 0, 0,
290 tdb_null, NULL, NULL, &cstatus);
291 if (ret != 0) {
292 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
293 return ret;
295 *vnn = (uint32_t)cstatus;
296 return ret;
300 * Are we active (i.e. not banned or stopped?)
302 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
304 int32_t cstatus=-1;
305 TDB_DATA outdata = {0};
306 struct ctdb_node_map_old *m;
307 bool ok = false;
308 uint32_t i;
309 int ret;
311 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_NODEMAP, 0, 0,
312 tdb_null, talloc_tos(), &outdata, &cstatus);
313 if (ret != 0) {
314 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
315 return false;
317 if ((cstatus != 0) || (outdata.dptr == NULL)) {
318 DEBUG(2, ("Received invalid ctdb data\n"));
319 goto fail;
322 m = (struct ctdb_node_map_old *)outdata.dptr;
324 for (i=0; i<m->num; i++) {
325 if (vnn == m->nodes[i].pnn) {
326 break;
330 if (i == m->num) {
331 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
332 (int)vnn));
333 goto fail;
336 if ((m->nodes[i].flags & NODE_FLAGS_INACTIVE) != 0) {
337 DEBUG(2, ("Node has status %x, not active\n",
338 (int)m->nodes[i].flags));
339 goto fail;
342 ok = true;
343 fail:
344 TALLOC_FREE(outdata.dptr);
345 return ok;
348 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
350 return conn->our_vnn;
354 * Get us a ctdb connection
357 static int ctdbd_connect(const char *sockname, int *pfd)
359 struct samba_sockaddr addr = {
360 .sa_socklen = sizeof(struct sockaddr_un),
361 .u = {
362 .un = {
363 .sun_family = AF_UNIX,
367 int fd;
368 size_t namelen;
369 int ret;
371 fd = socket(AF_UNIX, SOCK_STREAM, 0);
372 if (fd == -1) {
373 int err = errno;
374 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
375 return err;
378 namelen = strlcpy(addr.u.un.sun_path,
379 sockname,
380 sizeof(addr.u.un.sun_path));
381 if (namelen >= sizeof(addr.u.un.sun_path)) {
382 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
383 sockname));
384 close(fd);
385 return ENAMETOOLONG;
388 ret = connect(fd, &addr.u.sa, addr.sa_socklen);
389 if (ret == -1) {
390 int err = errno;
391 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
392 strerror(err)));
393 close(fd);
394 return err;
397 *pfd = fd;
398 return 0;
401 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
402 struct ctdb_req_header **result)
404 struct ctdb_req_header *req;
405 uint32_t msglen;
406 ssize_t nread;
408 if (timeout != -1) {
409 struct pollfd pfd = { .fd = fd, .events = POLLIN };
410 int ret;
412 ret = sys_poll_intr(&pfd, 1, timeout);
413 if (ret == -1) {
414 return errno;
416 if (ret == 0) {
417 return ETIMEDOUT;
419 if (ret != 1) {
420 return EIO;
424 nread = read_data(fd, &msglen, sizeof(msglen));
425 if (nread == -1) {
426 return errno;
428 if (nread == 0) {
429 return EIO;
432 if (msglen < sizeof(struct ctdb_req_header)) {
433 return EIO;
436 req = talloc_size(mem_ctx, msglen);
437 if (req == NULL) {
438 return ENOMEM;
440 talloc_set_name_const(req, "struct ctdb_req_header");
442 req->length = msglen;
444 nread = read_data(fd, ((char *)req) + sizeof(msglen),
445 msglen - sizeof(msglen));
446 if (nread == -1) {
447 TALLOC_FREE(req);
448 return errno;
450 if (nread == 0) {
451 TALLOC_FREE(req);
452 return EIO;
455 *result = req;
456 return 0;
460 * Read a full ctdbd request. If we have a messaging context, defer incoming
461 * messages that might come in between.
464 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
465 TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
467 struct ctdb_req_header *hdr = NULL;
468 int ret;
470 next_pkt:
472 ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
473 if (ret != 0) {
474 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
475 cluster_fatal("failed to read data from ctdbd\n");
476 return -1;
478 SMB_ASSERT(hdr != NULL);
480 DEBUG(11, ("Received ctdb packet\n"));
481 ctdb_packet_dump(hdr);
483 if (hdr->operation == CTDB_REQ_MESSAGE) {
484 struct ctdb_req_message_old *msg = (struct ctdb_req_message_old *)hdr;
486 ret = ctdbd_msg_call_back(NULL, conn, msg);
487 if (ret != 0) {
488 TALLOC_FREE(hdr);
489 return ret;
492 TALLOC_FREE(hdr);
493 goto next_pkt;
496 if ((reqid != 0) && (hdr->reqid != reqid)) {
497 /* we got the wrong reply */
498 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
499 "been %u\n", hdr->reqid, reqid));
500 TALLOC_FREE(hdr);
501 goto next_pkt;
504 *result = talloc_move(mem_ctx, &hdr);
506 return 0;
509 static int ctdbd_connection_destructor(struct ctdbd_connection *c);
512 * Get us a ctdbd connection
515 static int ctdbd_init_connection_internal(TALLOC_CTX *mem_ctx,
516 const char *sockname, int timeout,
517 struct ctdbd_connection *conn)
519 int ret;
521 conn->timeout = timeout;
522 if (conn->timeout == 0) {
523 conn->timeout = -1;
526 ret = ctdbd_connect(sockname, &conn->fd);
527 if (ret != 0) {
528 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
529 return ret;
531 talloc_set_destructor(conn, ctdbd_connection_destructor);
533 ret = get_cluster_vnn(conn, &conn->our_vnn);
534 if (ret != 0) {
535 DEBUG(10, ("get_cluster_vnn failed: %s\n", strerror(ret)));
536 return ret;
539 if (!ctdbd_working(conn, conn->our_vnn)) {
540 DEBUG(2, ("Node is not working, can not connect\n"));
541 return EIO;
544 generate_random_buffer((unsigned char *)&conn->rand_srvid,
545 sizeof(conn->rand_srvid));
547 ret = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
548 if (ret != 0) {
549 DEBUG(5, ("Could not register random srvid: %s\n",
550 strerror(ret)));
551 return ret;
554 return 0;
557 int ctdbd_init_connection(TALLOC_CTX *mem_ctx,
558 const char *sockname, int timeout,
559 struct ctdbd_connection **pconn)
561 struct ctdbd_connection *conn;
562 int ret;
564 if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
565 DEBUG(0, ("talloc failed\n"));
566 return ENOMEM;
569 ret = ctdbd_init_connection_internal(mem_ctx,
570 sockname,
571 timeout,
572 conn);
573 if (ret != 0) {
574 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
575 strerror(ret));
576 goto fail;
579 *pconn = conn;
580 return 0;
582 fail:
583 TALLOC_FREE(conn);
584 return ret;
587 int ctdbd_reinit_connection(TALLOC_CTX *mem_ctx,
588 const char *sockname, int timeout,
589 struct ctdbd_connection *conn)
591 int ret;
593 ret = ctdbd_connection_destructor(conn);
594 if (ret != 0) {
595 DBG_ERR("ctdbd_connection_destructor failed\n");
596 return ret;
599 ret = ctdbd_init_connection_internal(mem_ctx,
600 sockname,
601 timeout,
602 conn);
603 if (ret != 0) {
604 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
605 strerror(ret));
606 return ret;
609 return 0;
612 int ctdbd_init_async_connection(
613 TALLOC_CTX *mem_ctx,
614 const char *sockname,
615 int timeout,
616 struct ctdbd_connection **pconn)
618 struct ctdbd_connection *conn = NULL;
619 int ret;
621 *pconn = NULL;
623 ret = ctdbd_init_connection(mem_ctx, sockname, timeout, &conn);
624 if (ret != 0) {
625 return ret;
628 ret = set_blocking(conn->fd, false);
629 if (ret == -1) {
630 int err = errno;
631 SMB_ASSERT(err != 0);
632 TALLOC_FREE(conn);
633 return err;
636 conn->outgoing = tevent_queue_create(conn, "ctdb async outgoing");
637 if (conn->outgoing == NULL) {
638 TALLOC_FREE(conn);
639 return ENOMEM;
642 *pconn = conn;
643 return 0;
646 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
648 return conn->fd;
652 * Packet handler to receive and handle a ctdb message
654 static int ctdb_handle_message(struct tevent_context *ev,
655 struct ctdbd_connection *conn,
656 struct ctdb_req_header *hdr)
658 struct ctdb_req_message_old *msg;
660 if (hdr->operation != CTDB_REQ_MESSAGE) {
661 DEBUG(0, ("Received async msg of type %u, discarding\n",
662 hdr->operation));
663 return EINVAL;
666 msg = (struct ctdb_req_message_old *)hdr;
668 ctdbd_msg_call_back(ev, conn, msg);
670 return 0;
673 void ctdbd_socket_readable(struct tevent_context *ev,
674 struct ctdbd_connection *conn)
676 struct ctdb_req_header *hdr = NULL;
677 int ret;
679 ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
680 if (ret != 0) {
681 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
682 cluster_fatal("failed to read data from ctdbd\n");
684 SMB_ASSERT(hdr != NULL);
686 ret = ctdb_handle_message(ev, conn, hdr);
688 TALLOC_FREE(hdr);
690 if (ret != 0) {
691 DEBUG(10, ("could not handle incoming message: %s\n",
692 strerror(ret)));
696 int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
697 uint32_t dst_vnn, uint64_t dst_srvid,
698 const struct iovec *iov, int iovlen)
700 struct ctdb_req_message_old r;
701 struct iovec iov2[iovlen+1];
702 size_t buflen = iov_buflen(iov, iovlen);
703 ssize_t nwritten;
705 r.hdr.length = offsetof(struct ctdb_req_message_old, data) + buflen;
706 r.hdr.ctdb_magic = CTDB_MAGIC;
707 r.hdr.ctdb_version = CTDB_PROTOCOL;
708 r.hdr.generation = 1;
709 r.hdr.operation = CTDB_REQ_MESSAGE;
710 r.hdr.destnode = dst_vnn;
711 r.hdr.srcnode = conn->our_vnn;
712 r.hdr.reqid = 0;
713 r.srvid = dst_srvid;
714 r.datalen = buflen;
716 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
717 ctdb_packet_dump(&r.hdr);
719 iov2[0].iov_base = &r;
720 iov2[0].iov_len = offsetof(struct ctdb_req_message_old, data);
721 memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
723 nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
724 if (nwritten == -1) {
725 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
726 cluster_fatal("cluster dispatch daemon msg write error\n");
729 return 0;
733 * send/recv a generic ctdb control message
735 static int ctdbd_control(struct ctdbd_connection *conn,
736 uint32_t vnn, uint32_t opcode,
737 uint64_t srvid, uint32_t flags,
738 TDB_DATA data,
739 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
740 int32_t *cstatus)
742 struct ctdb_req_control_old req;
743 struct ctdb_req_header *hdr;
744 struct ctdb_reply_control_old *reply = NULL;
745 struct iovec iov[2];
746 ssize_t nwritten;
747 int ret;
749 if (ctdbd_conn_has_async_reqs(conn)) {
751 * Can't use sync call while an async call is in flight. Adding
752 * this check as a safety net. We'll be using different
753 * connections for sync and async requests, so this shouldn't
754 * happen, but who knows...
756 DBG_ERR("Async ctdb req on sync connection\n");
757 return EINVAL;
760 ZERO_STRUCT(req);
761 req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
762 req.hdr.ctdb_magic = CTDB_MAGIC;
763 req.hdr.ctdb_version = CTDB_PROTOCOL;
764 req.hdr.operation = CTDB_REQ_CONTROL;
765 req.hdr.reqid = ctdbd_next_reqid(conn);
766 req.hdr.destnode = vnn;
767 req.opcode = opcode;
768 req.srvid = srvid;
769 req.datalen = data.dsize;
770 req.flags = flags;
772 DBG_DEBUG("Sending ctdb packet reqid=%"PRIu32", vnn=%"PRIu32", "
773 "opcode=%"PRIu32", srvid=%"PRIu64"\n", req.hdr.reqid,
774 req.hdr.destnode, req.opcode, req.srvid);
775 ctdb_packet_dump(&req.hdr);
777 iov[0].iov_base = &req;
778 iov[0].iov_len = offsetof(struct ctdb_req_control_old, data);
779 iov[1].iov_base = data.dptr;
780 iov[1].iov_len = data.dsize;
782 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
783 if (nwritten == -1) {
784 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
785 cluster_fatal("cluster dispatch daemon msg write error\n");
788 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
789 if (cstatus) {
790 *cstatus = 0;
792 return 0;
795 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
796 if (ret != 0) {
797 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
798 return ret;
801 if (hdr->operation != CTDB_REPLY_CONTROL) {
802 DEBUG(0, ("received invalid reply\n"));
803 TALLOC_FREE(hdr);
804 return EIO;
806 reply = (struct ctdb_reply_control_old *)hdr;
808 if (outdata) {
809 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
810 mem_ctx, reply->data, reply->datalen))) {
811 TALLOC_FREE(reply);
812 return ENOMEM;
814 outdata->dsize = reply->datalen;
816 if (cstatus) {
817 (*cstatus) = reply->status;
820 TALLOC_FREE(reply);
821 return ret;
825 * see if a remote process exists
827 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn,
828 pid_t pid, uint64_t unique_id)
830 uint8_t buf[sizeof(pid)+sizeof(unique_id)];
831 int32_t cstatus = 0;
832 int ret;
834 if (unique_id == SERVERID_UNIQUE_ID_NOT_TO_VERIFY) {
835 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS,
836 0, 0,
837 (TDB_DATA) { .dptr = (uint8_t *)&pid,
838 .dsize = sizeof(pid) },
839 NULL, NULL, &cstatus);
840 if (ret != 0) {
841 return false;
843 return (cstatus == 0);
846 memcpy(buf, &pid, sizeof(pid));
847 memcpy(buf+sizeof(pid), &unique_id, sizeof(unique_id));
849 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_CHECK_PID_SRVID, 0, 0,
850 (TDB_DATA) { .dptr = buf, .dsize = sizeof(buf) },
851 NULL, NULL, &cstatus);
852 if (ret != 0) {
853 return false;
855 return (cstatus == 0);
859 * Get a db path
861 char *ctdbd_dbpath(struct ctdbd_connection *conn,
862 TALLOC_CTX *mem_ctx, uint32_t db_id)
864 int ret;
865 TDB_DATA data;
866 TDB_DATA rdata = {0};
867 int32_t cstatus = 0;
869 data.dptr = (uint8_t*)&db_id;
870 data.dsize = sizeof(db_id);
872 ret = ctdbd_control_local(conn, CTDB_CONTROL_GETDBPATH, 0, 0, data,
873 mem_ctx, &rdata, &cstatus);
874 if ((ret != 0) || cstatus != 0) {
875 DEBUG(0, (__location__ " ctdb_control for getdbpath failed: %s\n",
876 strerror(ret)));
877 TALLOC_FREE(rdata.dptr);
880 return (char *)rdata.dptr;
884 * attach to a ctdb database
886 int ctdbd_db_attach(struct ctdbd_connection *conn,
887 const char *name, uint32_t *db_id, bool persistent)
889 int ret;
890 TDB_DATA data = {0};
891 int32_t cstatus;
893 data = string_term_tdb_data(name);
895 ret = ctdbd_control_local(conn,
896 persistent
897 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
898 : CTDB_CONTROL_DB_ATTACH,
899 0, 0, data, NULL, &data, &cstatus);
900 if (ret != 0) {
901 DEBUG(0, (__location__ " ctdb_control for db_attach "
902 "failed: %s\n", strerror(ret)));
903 return ret;
906 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
907 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
908 TALLOC_FREE(data.dptr);
909 return EIO;
912 *db_id = *(uint32_t *)data.dptr;
913 talloc_free(data.dptr);
915 return 0;
919 * force the migration of a record to this node
921 int ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key)
923 struct ctdb_req_call_old req;
924 struct ctdb_req_header *hdr = NULL;
925 struct iovec iov[2];
926 ssize_t nwritten;
927 int ret;
929 if (ctdbd_conn_has_async_reqs(conn)) {
931 * Can't use sync call while an async call is in flight. Adding
932 * this check as a safety net. We'll be using different
933 * connections for sync and async requests, so this shouldn't
934 * happen, but who knows...
936 DBG_ERR("Async ctdb req on sync connection\n");
937 return EINVAL;
940 ZERO_STRUCT(req);
942 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
943 req.hdr.ctdb_magic = CTDB_MAGIC;
944 req.hdr.ctdb_version = CTDB_PROTOCOL;
945 req.hdr.operation = CTDB_REQ_CALL;
946 req.hdr.reqid = ctdbd_next_reqid(conn);
947 req.flags = CTDB_IMMEDIATE_MIGRATION;
948 req.callid = CTDB_NULL_FUNC;
949 req.db_id = db_id;
950 req.keylen = key.dsize;
952 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
953 ctdb_packet_dump(&req.hdr);
955 iov[0].iov_base = &req;
956 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
957 iov[1].iov_base = key.dptr;
958 iov[1].iov_len = key.dsize;
960 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
961 if (nwritten == -1) {
962 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
963 cluster_fatal("cluster dispatch daemon msg write error\n");
966 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
967 if (ret != 0) {
968 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
969 goto fail;
972 if (hdr->operation != CTDB_REPLY_CALL) {
973 if (hdr->operation == CTDB_REPLY_ERROR) {
974 DBG_ERR("received error from ctdb\n");
975 } else {
976 DBG_ERR("received invalid reply\n");
978 ret = EIO;
979 goto fail;
982 fail:
984 TALLOC_FREE(hdr);
985 return ret;
989 * Fetch a record and parse it
991 int ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
992 TDB_DATA key, bool local_copy,
993 void (*parser)(TDB_DATA key, TDB_DATA data,
994 void *private_data),
995 void *private_data)
997 struct ctdb_req_call_old req;
998 struct ctdb_req_header *hdr = NULL;
999 struct ctdb_reply_call_old *reply;
1000 struct iovec iov[2];
1001 ssize_t nwritten;
1002 uint32_t flags;
1003 int ret;
1005 if (ctdbd_conn_has_async_reqs(conn)) {
1007 * Can't use sync call while an async call is in flight. Adding
1008 * this check as a safety net. We'll be using different
1009 * connections for sync and async requests, so this shouldn't
1010 * happen, but who knows...
1012 DBG_ERR("Async ctdb req on sync connection\n");
1013 return EINVAL;
1016 flags = local_copy ? CTDB_WANT_READONLY : 0;
1018 ZERO_STRUCT(req);
1020 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
1021 req.hdr.ctdb_magic = CTDB_MAGIC;
1022 req.hdr.ctdb_version = CTDB_PROTOCOL;
1023 req.hdr.operation = CTDB_REQ_CALL;
1024 req.hdr.reqid = ctdbd_next_reqid(conn);
1025 req.flags = flags;
1026 req.callid = CTDB_FETCH_FUNC;
1027 req.db_id = db_id;
1028 req.keylen = key.dsize;
1030 iov[0].iov_base = &req;
1031 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
1032 iov[1].iov_base = key.dptr;
1033 iov[1].iov_len = key.dsize;
1035 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1036 if (nwritten == -1) {
1037 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
1038 cluster_fatal("cluster dispatch daemon msg write error\n");
1041 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
1042 if (ret != 0) {
1043 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
1044 goto fail;
1047 if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
1048 DEBUG(0, ("received invalid reply\n"));
1049 ret = EIO;
1050 goto fail;
1052 reply = (struct ctdb_reply_call_old *)hdr;
1054 if (reply->datalen == 0) {
1056 * Treat an empty record as non-existing
1058 ret = ENOENT;
1059 goto fail;
1062 parser(key, make_tdb_data(&reply->data[0], reply->datalen),
1063 private_data);
1065 ret = 0;
1066 fail:
1067 TALLOC_FREE(hdr);
1068 return ret;
1072 Traverse a ctdb database. "conn" must be an otherwise unused
1073 ctdb_connection where no other messages but the traverse ones are
1074 expected.
1077 int ctdbd_traverse(struct ctdbd_connection *conn, uint32_t db_id,
1078 void (*fn)(TDB_DATA key, TDB_DATA data,
1079 void *private_data),
1080 void *private_data)
1082 int ret;
1083 TDB_DATA key, data;
1084 struct ctdb_traverse_start t;
1085 int32_t cstatus = 0;
1087 if (ctdbd_conn_has_async_reqs(conn)) {
1089 * Can't use sync call while an async call is in flight. Adding
1090 * this check as a safety net. We'll be using different
1091 * connections for sync and async requests, so this shouldn't
1092 * happen, but who knows...
1094 DBG_ERR("Async ctdb req on sync connection\n");
1095 return EINVAL;
1098 t.db_id = db_id;
1099 t.srvid = conn->rand_srvid;
1100 t.reqid = ctdbd_next_reqid(conn);
1102 data.dptr = (uint8_t *)&t;
1103 data.dsize = sizeof(t);
1105 ret = ctdbd_control_local(conn, CTDB_CONTROL_TRAVERSE_START,
1106 conn->rand_srvid,
1107 0, data, NULL, NULL, &cstatus);
1109 if ((ret != 0) || (cstatus != 0)) {
1110 DEBUG(0,("ctdbd_control failed: %s, %d\n", strerror(ret),
1111 cstatus));
1113 if (ret == 0) {
1115 * We need a mapping here
1117 ret = EIO;
1119 return ret;
1122 while (true) {
1123 struct ctdb_req_header *hdr = NULL;
1124 struct ctdb_req_message_old *m;
1125 struct ctdb_rec_data_old *d;
1127 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1128 if (ret != 0) {
1129 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
1130 cluster_fatal("failed to read data from ctdbd\n");
1132 SMB_ASSERT(hdr != NULL);
1134 if (hdr->operation != CTDB_REQ_MESSAGE) {
1135 DEBUG(0, ("Got operation %u, expected a message\n",
1136 (unsigned)hdr->operation));
1137 return EIO;
1140 m = (struct ctdb_req_message_old *)hdr;
1141 d = (struct ctdb_rec_data_old *)&m->data[0];
1142 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1143 DEBUG(0, ("Got invalid traverse data of length %d\n",
1144 (int)m->datalen));
1145 return EIO;
1148 key.dsize = d->keylen;
1149 key.dptr = &d->data[0];
1150 data.dsize = d->datalen;
1151 data.dptr = &d->data[d->keylen];
1153 if (key.dsize == 0 && data.dsize == 0) {
1154 /* end of traverse */
1155 return 0;
1158 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1159 DEBUG(0, ("Got invalid ltdb header length %d\n",
1160 (int)data.dsize));
1161 return EIO;
1163 data.dsize -= sizeof(struct ctdb_ltdb_header);
1164 data.dptr += sizeof(struct ctdb_ltdb_header);
1166 if (fn != NULL) {
1167 fn(key, data, private_data);
1170 return 0;
1174 This is used to canonicalize a ctdb_sock_addr structure.
1176 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1177 struct sockaddr_storage *out)
1179 memcpy(out, in, sizeof (*out));
1181 #ifdef HAVE_IPV6
1182 if (in->ss_family == AF_INET6) {
1183 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1184 const struct sockaddr_in6 *in6 =
1185 (const struct sockaddr_in6 *)in;
1186 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1187 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1188 memset(out, 0, sizeof(*out));
1189 #ifdef HAVE_SOCK_SIN_LEN
1190 out4->sin_len = sizeof(*out);
1191 #endif
1192 out4->sin_family = AF_INET;
1193 out4->sin_port = in6->sin6_port;
1194 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1197 #endif
1201 * Register us as a server for a particular tcp connection
1204 int ctdbd_register_ips(struct ctdbd_connection *conn,
1205 const struct sockaddr_storage *_server,
1206 const struct sockaddr_storage *_client,
1207 int (*cb)(struct tevent_context *ev,
1208 uint32_t src_vnn, uint32_t dst_vnn,
1209 uint64_t dst_srvid,
1210 const uint8_t *msg, size_t msglen,
1211 void *private_data),
1212 void *private_data)
1214 struct ctdb_connection p;
1215 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1216 int ret;
1217 struct sockaddr_storage client;
1218 struct sockaddr_storage server;
1221 * Only one connection so far
1224 smbd_ctdb_canonicalize_ip(_client, &client);
1225 smbd_ctdb_canonicalize_ip(_server, &server);
1227 ZERO_STRUCT(p);
1228 switch (client.ss_family) {
1229 case AF_INET:
1230 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1231 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1232 break;
1233 case AF_INET6:
1234 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1235 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1236 break;
1237 default:
1238 return EIO;
1242 * We want to be told about IP releases
1245 ret = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1246 cb, private_data);
1247 if (ret != 0) {
1248 return ret;
1252 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1253 * can send an extra ack to trigger a reset for our client, so it
1254 * immediately reconnects
1256 ret = ctdbd_control_local(conn,
1257 CTDB_CONTROL_TCP_CLIENT, 0,
1258 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1259 NULL);
1260 if (ret != 0) {
1261 return ret;
1263 return 0;
1266 void ctdbd_unregister_ips(struct ctdbd_connection *conn,
1267 const struct sockaddr_storage *_server,
1268 const struct sockaddr_storage *_client,
1269 int (*cb)(struct tevent_context *ev,
1270 uint32_t src_vnn,
1271 uint32_t dst_vnn,
1272 uint64_t dst_srvid,
1273 const uint8_t *msg,
1274 size_t msglen,
1275 void *private_data),
1276 void *private_data)
1278 struct ctdb_connection p;
1279 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1280 int ret;
1281 struct sockaddr_storage client;
1282 struct sockaddr_storage server;
1285 * Only one connection so far
1288 smbd_ctdb_canonicalize_ip(_client, &client);
1289 smbd_ctdb_canonicalize_ip(_server, &server);
1291 ZERO_STRUCT(p);
1292 switch (client.ss_family) {
1293 case AF_INET:
1294 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1295 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1296 break;
1297 case AF_INET6:
1298 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1299 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1300 break;
1301 default:
1302 return;
1306 * We no longer want to be told about IP releases
1307 * for the given callback/private_data combination
1309 deregister_from_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1310 cb, private_data);
1313 * inform ctdb of our tcp connection is no longer active
1315 ret = ctdbd_control_local(conn,
1316 CTDB_CONTROL_TCP_CLIENT_DISCONNECTED, 0,
1317 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1318 NULL);
1319 if (ret != 0) {
1321 * We ignore errors here, as we'll just
1322 * no longer have a callback handler
1323 * registered and messages may just be ignored
1327 return;
1330 void ctdbd_passed_ips(struct ctdbd_connection *conn,
1331 const struct sockaddr_storage *_server,
1332 const struct sockaddr_storage *_client,
1333 int (*cb)(struct tevent_context *ev,
1334 uint32_t src_vnn,
1335 uint32_t dst_vnn,
1336 uint64_t dst_srvid,
1337 const uint8_t *msg,
1338 size_t msglen,
1339 void *private_data),
1340 void *private_data)
1342 struct ctdb_connection p;
1343 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1344 int ret;
1345 struct sockaddr_storage client;
1346 struct sockaddr_storage server;
1349 * Only one connection so far
1352 smbd_ctdb_canonicalize_ip(_client, &client);
1353 smbd_ctdb_canonicalize_ip(_server, &server);
1355 ZERO_STRUCT(p);
1356 switch (client.ss_family) {
1357 case AF_INET:
1358 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1359 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1360 break;
1361 case AF_INET6:
1362 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1363 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1364 break;
1365 default:
1366 return;
1370 * We no longer want to be told about IP releases
1371 * for the given callback/private_data combination
1373 deregister_from_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1374 cb, private_data);
1377 * inform ctdb of our tcp connection is now passed to
1378 * another process.
1380 ret = ctdbd_control_local(conn,
1381 CTDB_CONTROL_TCP_CLIENT_PASSED, 0,
1382 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1383 NULL);
1384 if (ret != 0) {
1386 * We ignore errors here, as we'll just
1387 * no longer have a callback handler
1388 * registered and messages may just be ignored
1392 return;
1395 static int ctdbd_control_get_public_ips(struct ctdbd_connection *conn,
1396 uint32_t flags,
1397 TALLOC_CTX *mem_ctx,
1398 struct ctdb_public_ip_list_old **_ips)
1400 struct ctdb_public_ip_list_old *ips = NULL;
1401 TDB_DATA outdata;
1402 int32_t cstatus = -1;
1403 size_t min_dsize;
1404 size_t max_ips;
1405 int ret;
1407 *_ips = NULL;
1409 ret = ctdbd_control_local(conn,
1410 CTDB_CONTROL_GET_PUBLIC_IPS,
1411 0, /* srvid */
1412 flags,
1413 tdb_null, /* indata */
1414 mem_ctx,
1415 &outdata,
1416 &cstatus);
1417 if (ret != 0 || cstatus != 0) {
1418 DBG_ERR("ctdb_control for getpublicips failed ret:%d cstatus:%d\n",
1419 ret, (int)cstatus);
1420 return -1;
1423 min_dsize = offsetof(struct ctdb_public_ip_list_old, ips);
1424 if (outdata.dsize < min_dsize) {
1425 DBG_ERR("outdata.dsize=%zu < min_dsize=%zu\n",
1426 outdata.dsize, min_dsize);
1427 return -1;
1429 max_ips = (outdata.dsize - min_dsize)/sizeof(struct ctdb_public_ip);
1430 ips = (struct ctdb_public_ip_list_old *)outdata.dptr;
1431 if ((size_t)ips->num > max_ips) {
1432 DBG_ERR("ips->num=%zu > max_ips=%zu\n",
1433 (size_t)ips->num, max_ips);
1434 return -1;
1437 *_ips = ips;
1438 return 0;
1441 static struct samba_sockaddr ctdbd_sock_addr_to_samba(const ctdb_sock_addr *c)
1443 struct samba_sockaddr s = {};
1445 switch (c->sa.sa_family) {
1446 case AF_INET:
1447 s.u.in = c->ip;
1448 break;
1449 case AF_INET6:
1451 * ctdb always requires HAVE_IPV6,
1452 * so we don't need an ifdef here.
1454 s.u.in6 = c->ip6;
1455 break;
1456 default:
1458 * ctdb_sock_addr only supports ipv4 and ipv6
1460 smb_panic(__location__);
1461 break;
1464 return s;
1467 int ctdbd_public_ip_foreach(struct ctdbd_connection *conn,
1468 int (*cb)(uint32_t total_ip_count,
1469 const struct sockaddr_storage *ip,
1470 bool is_movable_ip,
1471 void *private_data),
1472 void *private_data)
1474 uint32_t i;
1475 struct ctdb_public_ip_list_old *ips = NULL;
1476 int ret = ENOMEM;
1477 TALLOC_CTX *frame = talloc_stackframe();
1479 ret = ctdbd_control_get_public_ips(conn, 0, frame, &ips);
1480 if (ret < 0) {
1481 ret = EIO;
1482 goto out_free;
1485 for (i=0; i < ips->num; i++) {
1486 const ctdb_sock_addr *addr = &ips->ips[i].addr;
1487 struct samba_sockaddr tmp = ctdbd_sock_addr_to_samba(addr);
1489 ret = cb(ips->num,
1490 &tmp.u.ss,
1491 true, /* all ctdb public ips are movable */
1492 private_data);
1493 if (ret != 0) {
1494 goto out_free;
1498 ret = 0;
1499 out_free:
1500 TALLOC_FREE(frame);
1501 return ret;
1505 call a control on the local node
1507 int ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1508 uint64_t srvid, uint32_t flags, TDB_DATA data,
1509 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1510 int32_t *cstatus)
1512 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data,
1513 mem_ctx, outdata, cstatus);
1516 int ctdb_watch_us(struct ctdbd_connection *conn)
1518 struct ctdb_notify_data_old reg_data;
1519 size_t struct_len;
1520 int ret;
1521 int32_t cstatus;
1523 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1524 reg_data.len = 1;
1525 reg_data.notify_data[0] = 0;
1527 struct_len = offsetof(struct ctdb_notify_data_old,
1528 notify_data) + reg_data.len;
1530 ret = ctdbd_control_local(
1531 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1532 make_tdb_data((uint8_t *)&reg_data, struct_len),
1533 NULL, NULL, &cstatus);
1534 if (ret != 0) {
1535 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1536 strerror(ret)));
1538 return ret;
1541 int ctdb_unwatch(struct ctdbd_connection *conn)
1543 uint64_t srvid = CTDB_SRVID_SAMBA_NOTIFY;
1544 int ret;
1545 int32_t cstatus;
1547 ret = ctdbd_control_local(
1548 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1549 make_tdb_data((uint8_t *)&srvid, sizeof(srvid)),
1550 NULL, NULL, &cstatus);
1551 if (ret != 0) {
1552 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1553 strerror(ret)));
1555 return ret;
1558 int ctdbd_probe(const char *sockname, int timeout)
1561 * Do a very early check if ctdbd is around to avoid an abort and core
1562 * later
1564 struct ctdbd_connection *conn = NULL;
1565 int ret;
1567 ret = ctdbd_init_connection(talloc_tos(), sockname, timeout,
1568 &conn);
1571 * We only care if we can connect.
1573 TALLOC_FREE(conn);
1575 return ret;
1578 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
1580 if (c->fd != -1) {
1581 close(c->fd);
1582 c->fd = -1;
1584 return 0;
1587 void ctdbd_prep_hdr_next_reqid(
1588 struct ctdbd_connection *conn, struct ctdb_req_header *hdr)
1590 *hdr = (struct ctdb_req_header) {
1591 .ctdb_magic = CTDB_MAGIC,
1592 .ctdb_version = CTDB_PROTOCOL,
1593 .reqid = ctdbd_next_reqid(conn),
1594 .destnode = CTDB_CURRENT_NODE,
1598 struct ctdbd_pkt_read_state {
1599 uint8_t *pkt;
1602 static ssize_t ctdbd_pkt_read_more(
1603 uint8_t *buf, size_t buflen, void *private_data);
1604 static void ctdbd_pkt_read_done(struct tevent_req *subreq);
1606 static struct tevent_req *ctdbd_pkt_read_send(
1607 TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd)
1609 struct tevent_req *req = NULL, *subreq = NULL;
1610 struct ctdbd_pkt_read_state *state = NULL;
1612 req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state);
1613 if (req == NULL) {
1614 return NULL;
1616 subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL);
1617 if (tevent_req_nomem(subreq, req)) {
1618 return tevent_req_post(req, ev);
1620 tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req);
1621 return req;
1624 static ssize_t ctdbd_pkt_read_more(
1625 uint8_t *buf, size_t buflen, void *private_data)
1627 uint32_t msglen;
1628 if (buflen < 4) {
1629 return -1;
1631 if (buflen > 4) {
1632 return 0; /* Been here, done */
1634 memcpy(&msglen, buf, 4);
1636 if (msglen < sizeof(struct ctdb_req_header)) {
1637 return -1;
1639 return msglen - sizeof(msglen);
1642 static void ctdbd_pkt_read_done(struct tevent_req *subreq)
1644 struct tevent_req *req = tevent_req_callback_data(
1645 subreq, struct tevent_req);
1646 struct ctdbd_pkt_read_state *state = tevent_req_data(
1647 req, struct ctdbd_pkt_read_state);
1648 ssize_t nread;
1649 int err;
1651 nread = read_packet_recv(subreq, state, &state->pkt, &err);
1652 TALLOC_FREE(subreq);
1653 if (nread == -1) {
1654 tevent_req_error(req, err);
1655 return;
1657 tevent_req_done(req);
1660 static int ctdbd_pkt_read_recv(
1661 struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt)
1663 struct ctdbd_pkt_read_state *state = tevent_req_data(
1664 req, struct ctdbd_pkt_read_state);
1665 int err;
1667 if (tevent_req_is_unix_error(req, &err)) {
1668 return err;
1670 *pkt = talloc_move(mem_ctx, &state->pkt);
1671 tevent_req_received(req);
1672 return 0;
1675 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn);
1676 static void ctdbd_conn_received(struct tevent_req *subreq);
1678 struct ctdbd_req_state {
1679 struct ctdbd_connection *conn;
1680 struct tevent_context *ev;
1681 uint32_t reqid;
1682 struct ctdb_req_header *reply;
1685 static void ctdbd_req_unset_pending(struct tevent_req *req)
1687 struct ctdbd_req_state *state = tevent_req_data(
1688 req, struct ctdbd_req_state);
1689 struct ctdbd_connection *conn = state->conn;
1690 size_t num_pending = talloc_array_length(conn->pending);
1691 size_t i, num_after;
1693 tevent_req_set_cleanup_fn(req, NULL);
1695 if (num_pending == 1) {
1697 * conn->read_req is a child of conn->pending
1699 TALLOC_FREE(conn->pending);
1700 conn->read_req = NULL;
1701 return;
1704 for (i=0; i<num_pending; i++) {
1705 if (req == conn->pending[i]) {
1706 break;
1709 if (i == num_pending) {
1711 * Something's seriously broken. Just returning here is the
1712 * right thing nevertheless, the point of this routine is to
1713 * remove ourselves from conn->pending.
1715 return;
1718 num_after = num_pending - i - 1;
1719 if (num_after > 0) {
1720 memmove(&conn->pending[i],
1721 &conn->pending[i] + 1,
1722 sizeof(*conn->pending) * num_after);
1724 conn->pending = talloc_realloc(
1725 NULL, conn->pending, struct tevent_req *, num_pending - 1);
1728 static void ctdbd_req_cleanup(
1729 struct tevent_req *req, enum tevent_req_state req_state)
1731 ctdbd_req_unset_pending(req);
1734 static bool ctdbd_req_set_pending(struct tevent_req *req)
1736 struct ctdbd_req_state *state = tevent_req_data(
1737 req, struct ctdbd_req_state);
1738 struct ctdbd_connection *conn = state->conn;
1739 struct tevent_req **pending = NULL;
1740 size_t num_pending = talloc_array_length(conn->pending);
1741 bool ok;
1743 pending = talloc_realloc(
1744 conn, conn->pending, struct tevent_req *, num_pending + 1);
1745 if (pending == NULL) {
1746 return false;
1748 pending[num_pending] = req;
1749 conn->pending = pending;
1751 tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup);
1753 ok = ctdbd_conn_receive_next(conn);
1754 if (!ok) {
1755 ctdbd_req_unset_pending(req);
1756 return false;
1759 return true;
1762 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn)
1764 size_t num_pending = talloc_array_length(conn->pending);
1765 struct tevent_req *req = NULL;
1766 struct ctdbd_req_state *state = NULL;
1768 if (conn->read_req != NULL) {
1769 return true;
1771 if (num_pending == 0) {
1773 * done for now
1775 return true;
1778 req = conn->pending[0];
1779 state = tevent_req_data(req, struct ctdbd_req_state);
1781 conn->read_req = ctdbd_pkt_read_send(
1782 conn->pending, state->ev, conn->fd);
1783 if (conn->read_req == NULL) {
1784 return false;
1786 tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn);
1787 return true;
1790 static void ctdbd_conn_received(struct tevent_req *subreq)
1792 struct ctdbd_connection *conn = tevent_req_callback_data(
1793 subreq, struct ctdbd_connection);
1794 TALLOC_CTX *frame = talloc_stackframe();
1795 uint8_t *pkt = NULL;
1796 int ret;
1797 struct ctdb_req_header *hdr = NULL;
1798 uint32_t reqid;
1799 struct tevent_req *req = NULL;
1800 struct ctdbd_req_state *state = NULL;
1801 size_t i, num_pending;
1802 bool ok;
1804 SMB_ASSERT(subreq == conn->read_req);
1805 conn->read_req = NULL;
1807 ret = ctdbd_pkt_read_recv(subreq, frame, &pkt);
1808 TALLOC_FREE(subreq);
1809 if (ret != 0) {
1810 cluster_fatal("ctdbd_pkt_read failed\n");
1813 hdr = (struct ctdb_req_header *)pkt;
1814 reqid = hdr->reqid;
1815 num_pending = talloc_array_length(conn->pending);
1817 for (i=0; i<num_pending; i++) {
1818 req = conn->pending[i];
1819 state = tevent_req_data(req, struct ctdbd_req_state);
1820 if (state->reqid == reqid) {
1821 break;
1825 if (i == num_pending) {
1826 /* not found */
1827 TALLOC_FREE(frame);
1828 return;
1831 state->reply = talloc_move(state, &hdr);
1832 tevent_req_defer_callback(req, state->ev);
1833 tevent_req_done(req);
1835 TALLOC_FREE(frame);
1837 ok = ctdbd_conn_receive_next(conn);
1838 if (!ok) {
1839 cluster_fatal("ctdbd_conn_receive_next failed\n");
1843 static void ctdbd_req_written(struct tevent_req *subreq);
1845 struct tevent_req *ctdbd_req_send(
1846 TALLOC_CTX *mem_ctx,
1847 struct tevent_context *ev,
1848 struct ctdbd_connection *conn,
1849 struct iovec *iov,
1850 size_t num_iov)
1852 struct tevent_req *req = NULL, *subreq = NULL;
1853 struct ctdbd_req_state *state = NULL;
1854 struct ctdb_req_header *hdr = NULL;
1855 bool ok;
1857 req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state);
1858 if (req == NULL) {
1859 return NULL;
1861 state->conn = conn;
1862 state->ev = ev;
1864 if ((num_iov == 0) ||
1865 (iov[0].iov_len < sizeof(struct ctdb_req_header))) {
1866 tevent_req_error(req, EINVAL);
1867 return tevent_req_post(req, ev);
1869 hdr = iov[0].iov_base;
1870 state->reqid = hdr->reqid;
1872 ok = ctdbd_req_set_pending(req);
1873 if (!ok) {
1874 tevent_req_oom(req);
1875 return tevent_req_post(req, ev);
1878 subreq = writev_send(
1879 state, ev, conn->outgoing, conn->fd, false, iov, num_iov);
1880 if (tevent_req_nomem(subreq, req)) {
1881 return tevent_req_post(req, ev);
1883 tevent_req_set_callback(subreq, ctdbd_req_written, req);
1885 return req;
1888 static void ctdbd_req_written(struct tevent_req *subreq)
1890 struct tevent_req *req = tevent_req_callback_data(
1891 subreq, struct tevent_req);
1892 ssize_t nwritten;
1893 int err;
1895 nwritten = writev_recv(subreq, &err);
1896 TALLOC_FREE(subreq);
1897 if (nwritten == -1) {
1898 tevent_req_error(req, err);
1899 return;
1903 int ctdbd_req_recv(
1904 struct tevent_req *req,
1905 TALLOC_CTX *mem_ctx,
1906 struct ctdb_req_header **reply)
1908 struct ctdbd_req_state *state = tevent_req_data(
1909 req, struct ctdbd_req_state);
1910 int err;
1912 if (tevent_req_is_unix_error(req, &err)) {
1913 return err;
1915 *reply = talloc_move(mem_ctx, &state->reply);
1916 tevent_req_received(req);
1917 return 0;
1920 struct ctdbd_parse_state {
1921 struct tevent_context *ev;
1922 struct ctdbd_connection *conn;
1923 uint32_t reqid;
1924 TDB_DATA key;
1925 uint8_t _keybuf[64];
1926 struct ctdb_req_call_old ctdb_req;
1927 struct iovec iov[2];
1928 void (*parser)(TDB_DATA key,
1929 TDB_DATA data,
1930 void *private_data);
1931 void *private_data;
1934 static void ctdbd_parse_done(struct tevent_req *subreq);
1936 struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
1937 struct tevent_context *ev,
1938 struct ctdbd_connection *conn,
1939 uint32_t db_id,
1940 TDB_DATA key,
1941 bool local_copy,
1942 void (*parser)(TDB_DATA key,
1943 TDB_DATA data,
1944 void *private_data),
1945 void *private_data,
1946 enum dbwrap_req_state *req_state)
1948 struct tevent_req *req = NULL;
1949 struct ctdbd_parse_state *state = NULL;
1950 uint32_t flags;
1951 uint32_t packet_length;
1952 struct tevent_req *subreq = NULL;
1954 req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
1955 if (req == NULL) {
1956 *req_state = DBWRAP_REQ_ERROR;
1957 return NULL;
1960 *req_state = DBWRAP_REQ_DISPATCHED;
1962 *state = (struct ctdbd_parse_state) {
1963 .ev = ev,
1964 .conn = conn,
1965 .reqid = ctdbd_next_reqid(conn),
1966 .parser = parser,
1967 .private_data = private_data,
1970 flags = local_copy ? CTDB_WANT_READONLY : 0;
1971 packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
1974 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
1975 * all passed iov elements have a lifetime longer that the tevent_req
1976 * returned by ctdb_pkt_send_send(). This is required continue sending a
1977 * the low level request into the ctdb socket, if a higher level
1978 * ('this') request is canceled (or talloc free'd) by the application
1979 * layer, without sending invalid packets to ctdb.
1981 if (key.dsize > sizeof(state->_keybuf)) {
1982 state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
1983 if (tevent_req_nomem(state->key.dptr, req)) {
1984 return tevent_req_post(req, ev);
1986 } else {
1987 memcpy(state->_keybuf, key.dptr, key.dsize);
1988 state->key.dptr = state->_keybuf;
1990 state->key.dsize = key.dsize;
1992 state->ctdb_req.hdr.length = packet_length;
1993 state->ctdb_req.hdr.ctdb_magic = CTDB_MAGIC;
1994 state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
1995 state->ctdb_req.hdr.operation = CTDB_REQ_CALL;
1996 state->ctdb_req.hdr.reqid = state->reqid;
1997 state->ctdb_req.flags = flags;
1998 state->ctdb_req.callid = CTDB_FETCH_FUNC;
1999 state->ctdb_req.db_id = db_id;
2000 state->ctdb_req.keylen = state->key.dsize;
2002 state->iov[0].iov_base = &state->ctdb_req;
2003 state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
2004 state->iov[1].iov_base = state->key.dptr;
2005 state->iov[1].iov_len = state->key.dsize;
2007 subreq = ctdbd_req_send(
2008 state, ev, conn, state->iov, ARRAY_SIZE(state->iov));
2009 if (tevent_req_nomem(subreq, req)) {
2010 *req_state = DBWRAP_REQ_ERROR;
2011 return tevent_req_post(req, ev);
2013 tevent_req_set_callback(subreq, ctdbd_parse_done, req);
2015 return req;
2018 static void ctdbd_parse_done(struct tevent_req *subreq)
2020 struct tevent_req *req = tevent_req_callback_data(
2021 subreq, struct tevent_req);
2022 struct ctdbd_parse_state *state = tevent_req_data(
2023 req, struct ctdbd_parse_state);
2024 struct ctdb_req_header *hdr = NULL;
2025 struct ctdb_reply_call_old *reply = NULL;
2026 int ret;
2028 ret = ctdbd_req_recv(subreq, state, &hdr);
2029 TALLOC_FREE(subreq);
2030 if (tevent_req_error(req, ret)) {
2031 DBG_DEBUG("ctdb_req_recv failed %s\n", strerror(ret));
2032 return;
2034 SMB_ASSERT(hdr != NULL);
2036 if (hdr->operation != CTDB_REPLY_CALL) {
2037 DBG_ERR("received invalid reply\n");
2038 ctdb_packet_dump(hdr);
2039 tevent_req_error(req, EIO);
2040 return;
2043 reply = (struct ctdb_reply_call_old *)hdr;
2045 if (reply->datalen == 0) {
2047 * Treat an empty record as non-existing
2049 tevent_req_error(req, ENOENT);
2050 return;
2053 state->parser(state->key,
2054 make_tdb_data(&reply->data[0], reply->datalen),
2055 state->private_data);
2057 tevent_req_done(req);
2058 return;
2061 int ctdbd_parse_recv(struct tevent_req *req)
2063 return tevent_req_simple_recv_unix(req);