s3:ctdbd_conn: split out ctdbd_control_get_nodemap()
[samba.git] / source3 / lib / ctdbd_conn.c
blobfa18071a78104500a427c66ffeca7366bd6ca3d5
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "replace.h"
22 #include <tevent.h>
23 #include "util_tdb.h"
24 #include "serverid.h"
25 #include "ctdbd_conn.h"
26 #include "system/select.h"
27 #include "lib/util/util_net.h"
28 #include "lib/util/sys_rw_data.h"
29 #include "lib/util/iov_buf.h"
30 #include "lib/util/select.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/talloc_stack.h"
33 #include "lib/util/genrand.h"
34 #include "lib/util/fault.h"
35 #include "lib/util/dlinklist.h"
36 #include "lib/util/tevent_unix.h"
37 #include "lib/util/sys_rw.h"
38 #include "lib/util/blocking.h"
39 #include "ctdb/include/ctdb_protocol.h"
40 #include "lib/async_req/async_sock.h"
42 /* paths to these include files come from --with-ctdb= in configure */
44 struct ctdbd_srvid_cb {
45 uint64_t srvid;
46 int (*cb)(struct tevent_context *ev,
47 uint32_t src_vnn, uint32_t dst_vnn,
48 uint64_t dst_srvid,
49 const uint8_t *msg, size_t msglen,
50 void *private_data);
51 void *private_data;
54 struct ctdbd_connection {
55 uint32_t reqid;
56 uint32_t our_vnn;
57 uint64_t rand_srvid;
58 struct ctdbd_srvid_cb *callbacks;
59 int fd;
60 int timeout;
63 * Outgoing queue for writev_send of asynchronous ctdb requests
65 struct tevent_queue *outgoing;
66 struct tevent_req **pending;
67 struct tevent_req *read_req;
70 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
72 size_t len = talloc_array_length(conn->pending);
73 return (len != 0);
76 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
78 conn->reqid += 1;
79 if (conn->reqid == 0) {
80 conn->reqid += 1;
82 return conn->reqid;
85 static int ctdbd_control(struct ctdbd_connection *conn,
86 uint32_t vnn, uint32_t opcode,
87 uint64_t srvid, uint32_t flags,
88 TDB_DATA data,
89 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
90 int32_t *cstatus);
93 * exit on fatal communications errors with the ctdbd daemon
95 static void cluster_fatal(const char *why)
97 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
98 /* we don't use smb_panic() as we don't want to delay to write
99 a core file. We need to release this process id immediately
100 so that someone else can take over without getting sharing
101 violations */
102 _exit(1);
108 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
110 if (DEBUGLEVEL < 11) {
111 return;
113 DEBUGADD(11, ("len=%"PRIu32", magic=%"PRIu32", vers=%"PRIu32", "
114 "gen=%"PRIu32", op=%"PRIu32", reqid=%"PRIu32"\n",
115 hdr->length,
116 hdr->ctdb_magic,
117 hdr->ctdb_version,
118 hdr->generation,
119 hdr->operation,
120 hdr->reqid));
124 * Register a srvid with ctdbd
126 int register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
127 int (*cb)(struct tevent_context *ev,
128 uint32_t src_vnn, uint32_t dst_vnn,
129 uint64_t dst_srvid,
130 const uint8_t *msg, size_t msglen,
131 void *private_data),
132 void *private_data)
134 size_t num_callbacks = talloc_array_length(conn->callbacks);
135 struct ctdbd_srvid_cb *tmp;
136 bool need_register = true;
137 size_t i;
139 for (i = 0; i < num_callbacks; i++) {
140 struct ctdbd_srvid_cb *c = &conn->callbacks[i];
142 if (c->srvid == srvid) {
143 need_register = false;
144 break;
148 if (need_register) {
149 int ret;
150 int32_t cstatus;
152 ret = ctdbd_control_local(conn, CTDB_CONTROL_REGISTER_SRVID,
153 srvid, 0, tdb_null, NULL, NULL,
154 &cstatus);
155 if (ret != 0) {
156 return ret;
161 tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
162 num_callbacks + 1);
163 if (tmp == NULL) {
164 return ENOMEM;
166 conn->callbacks = tmp;
168 conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
169 .srvid = srvid, .cb = cb, .private_data = private_data
172 return 0;
175 void deregister_from_ctdbd(struct ctdbd_connection *conn,
176 uint64_t srvid,
177 int (*cb)(struct tevent_context *ev,
178 uint32_t src_vnn,
179 uint32_t dst_vnn,
180 uint64_t dst_srvid,
181 const uint8_t *msg,
182 size_t msglen,
183 void *private_data),
184 void *private_data)
186 struct ctdbd_srvid_cb *cbs = conn->callbacks;
187 size_t i, num_callbacks = talloc_array_length(cbs);
188 bool need_deregister = false;
189 bool keep_registration = false;
191 if (num_callbacks == 0) {
192 return;
195 for (i = 0; i < num_callbacks;) {
196 struct ctdbd_srvid_cb *c = &cbs[i];
198 if (c->srvid != srvid) {
199 i++;
200 continue;
203 if ((c->cb == cb) && (c->private_data == private_data)) {
204 need_deregister = true;
205 ARRAY_DEL_ELEMENT(cbs, i, num_callbacks);
206 num_callbacks--;
207 continue;
210 keep_registration = true;
211 i++;
214 conn->callbacks = talloc_realloc(conn,
215 cbs,
216 struct ctdbd_srvid_cb,
217 num_callbacks);
219 if (keep_registration) {
220 need_deregister = false;
223 if (need_deregister) {
224 int ret;
225 int32_t cstatus;
227 ret = ctdbd_control_local(conn, CTDB_CONTROL_DEREGISTER_SRVID,
228 srvid, 0, tdb_null, NULL, NULL,
229 &cstatus);
230 if (ret != 0) {
232 * If CTDB_CONTROL_DEREGISTER_SRVID fails we may still
233 * get messages later, but we don't have a callback
234 * anymore, we just ignore these.
239 return;
242 static int ctdbd_msg_call_back(struct tevent_context *ev,
243 struct ctdbd_connection *conn,
244 struct ctdb_req_message_old *msg)
246 uint32_t msg_len;
247 size_t i, num_callbacks;
249 msg_len = msg->hdr.length;
250 if (msg_len < offsetof(struct ctdb_req_message_old, data)) {
251 DBG_DEBUG("len %"PRIu32" too small\n", msg_len);
252 return 0;
254 msg_len -= offsetof(struct ctdb_req_message_old, data);
256 if (msg_len < msg->datalen) {
257 DBG_DEBUG("msg_len=%"PRIu32" < msg->datalen=%"PRIu32"\n",
258 msg_len, msg->datalen);
259 return 0;
262 num_callbacks = talloc_array_length(conn->callbacks);
264 for (i=0; i<num_callbacks; i++) {
265 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
267 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
268 int ret;
270 ret = cb->cb(ev,
271 msg->hdr.srcnode, msg->hdr.destnode,
272 msg->srvid, msg->data, msg->datalen,
273 cb->private_data);
274 if (ret != 0) {
275 return ret;
279 return 0;
283 * get our vnn from the cluster
285 static int get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
287 int32_t cstatus=-1;
288 int ret;
289 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_PNN, 0, 0,
290 tdb_null, NULL, NULL, &cstatus);
291 if (ret != 0) {
292 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
293 return ret;
295 *vnn = (uint32_t)cstatus;
296 return ret;
299 static int ctdbd_control_get_nodemap(struct ctdbd_connection *conn,
300 TALLOC_CTX *mem_ctx,
301 struct ctdb_node_map_old **_nodemap)
303 int32_t cstatus=-1;
304 TDB_DATA outdata = {0};
305 int ret;
307 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_NODEMAP, 0, 0,
308 tdb_null, mem_ctx, &outdata, &cstatus);
309 if (ret != 0) {
310 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
311 return ret;
313 if ((cstatus != 0) || (outdata.dptr == NULL)) {
314 DEBUG(2, ("Received invalid ctdb data\n"));
315 return EINVAL;
318 *_nodemap = (struct ctdb_node_map_old *)outdata.dptr;
319 return 0;
323 * Are we active (i.e. not banned or stopped?)
325 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
327 struct ctdb_node_map_old *m = NULL;
328 bool ok = false;
329 uint32_t i;
330 int ret;
332 ret = ctdbd_control_get_nodemap(conn, talloc_tos(), &m);
333 if (ret != 0) {
334 DEBUG(1, ("ctdbd_control_get_nodemap() failed: %s\n", strerror(ret)));
335 return false;
338 for (i=0; i<m->num; i++) {
339 if (vnn == m->nodes[i].pnn) {
340 break;
344 if (i == m->num) {
345 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
346 (int)vnn));
347 goto fail;
350 if ((m->nodes[i].flags & NODE_FLAGS_INACTIVE) != 0) {
351 DEBUG(2, ("Node has status %x, not active\n",
352 (int)m->nodes[i].flags));
353 goto fail;
356 ok = true;
357 fail:
358 TALLOC_FREE(m);
359 return ok;
362 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
364 return conn->our_vnn;
368 * Get us a ctdb connection
371 static int ctdbd_connect(const char *sockname, int *pfd)
373 struct samba_sockaddr addr = {
374 .sa_socklen = sizeof(struct sockaddr_un),
375 .u = {
376 .un = {
377 .sun_family = AF_UNIX,
381 int fd;
382 size_t namelen;
383 int ret;
385 fd = socket(AF_UNIX, SOCK_STREAM, 0);
386 if (fd == -1) {
387 int err = errno;
388 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
389 return err;
392 namelen = strlcpy(addr.u.un.sun_path,
393 sockname,
394 sizeof(addr.u.un.sun_path));
395 if (namelen >= sizeof(addr.u.un.sun_path)) {
396 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
397 sockname));
398 close(fd);
399 return ENAMETOOLONG;
402 ret = connect(fd, &addr.u.sa, addr.sa_socklen);
403 if (ret == -1) {
404 int err = errno;
405 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
406 strerror(err)));
407 close(fd);
408 return err;
411 *pfd = fd;
412 return 0;
415 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
416 struct ctdb_req_header **result)
418 struct ctdb_req_header *req;
419 uint32_t msglen;
420 ssize_t nread;
422 if (timeout != -1) {
423 struct pollfd pfd = { .fd = fd, .events = POLLIN };
424 int ret;
426 ret = sys_poll_intr(&pfd, 1, timeout);
427 if (ret == -1) {
428 return errno;
430 if (ret == 0) {
431 return ETIMEDOUT;
433 if (ret != 1) {
434 return EIO;
438 nread = read_data(fd, &msglen, sizeof(msglen));
439 if (nread == -1) {
440 return errno;
442 if (nread == 0) {
443 return EIO;
446 if (msglen < sizeof(struct ctdb_req_header)) {
447 return EIO;
450 req = talloc_size(mem_ctx, msglen);
451 if (req == NULL) {
452 return ENOMEM;
454 talloc_set_name_const(req, "struct ctdb_req_header");
456 req->length = msglen;
458 nread = read_data(fd, ((char *)req) + sizeof(msglen),
459 msglen - sizeof(msglen));
460 if (nread == -1) {
461 TALLOC_FREE(req);
462 return errno;
464 if (nread == 0) {
465 TALLOC_FREE(req);
466 return EIO;
469 *result = req;
470 return 0;
474 * Read a full ctdbd request. If we have a messaging context, defer incoming
475 * messages that might come in between.
478 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
479 TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
481 struct ctdb_req_header *hdr = NULL;
482 int ret;
484 next_pkt:
486 ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
487 if (ret != 0) {
488 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
489 cluster_fatal("failed to read data from ctdbd\n");
490 return -1;
492 SMB_ASSERT(hdr != NULL);
494 DEBUG(11, ("Received ctdb packet\n"));
495 ctdb_packet_dump(hdr);
497 if (hdr->operation == CTDB_REQ_MESSAGE) {
498 struct ctdb_req_message_old *msg = (struct ctdb_req_message_old *)hdr;
500 ret = ctdbd_msg_call_back(NULL, conn, msg);
501 if (ret != 0) {
502 TALLOC_FREE(hdr);
503 return ret;
506 TALLOC_FREE(hdr);
507 goto next_pkt;
510 if ((reqid != 0) && (hdr->reqid != reqid)) {
511 /* we got the wrong reply */
512 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
513 "been %u\n", hdr->reqid, reqid));
514 TALLOC_FREE(hdr);
515 goto next_pkt;
518 *result = talloc_move(mem_ctx, &hdr);
520 return 0;
523 static int ctdbd_connection_destructor(struct ctdbd_connection *c);
526 * Get us a ctdbd connection
529 static int ctdbd_init_connection_internal(TALLOC_CTX *mem_ctx,
530 const char *sockname, int timeout,
531 struct ctdbd_connection *conn)
533 int ret;
535 conn->timeout = timeout;
536 if (conn->timeout == 0) {
537 conn->timeout = -1;
540 ret = ctdbd_connect(sockname, &conn->fd);
541 if (ret != 0) {
542 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
543 return ret;
545 talloc_set_destructor(conn, ctdbd_connection_destructor);
547 ret = get_cluster_vnn(conn, &conn->our_vnn);
548 if (ret != 0) {
549 DEBUG(10, ("get_cluster_vnn failed: %s\n", strerror(ret)));
550 return ret;
553 if (!ctdbd_working(conn, conn->our_vnn)) {
554 DEBUG(2, ("Node is not working, can not connect\n"));
555 return EIO;
558 generate_random_buffer((unsigned char *)&conn->rand_srvid,
559 sizeof(conn->rand_srvid));
561 ret = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
562 if (ret != 0) {
563 DEBUG(5, ("Could not register random srvid: %s\n",
564 strerror(ret)));
565 return ret;
568 return 0;
571 int ctdbd_init_connection(TALLOC_CTX *mem_ctx,
572 const char *sockname, int timeout,
573 struct ctdbd_connection **pconn)
575 struct ctdbd_connection *conn;
576 int ret;
578 if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
579 DEBUG(0, ("talloc failed\n"));
580 return ENOMEM;
583 ret = ctdbd_init_connection_internal(mem_ctx,
584 sockname,
585 timeout,
586 conn);
587 if (ret != 0) {
588 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
589 strerror(ret));
590 goto fail;
593 *pconn = conn;
594 return 0;
596 fail:
597 TALLOC_FREE(conn);
598 return ret;
601 int ctdbd_reinit_connection(TALLOC_CTX *mem_ctx,
602 const char *sockname, int timeout,
603 struct ctdbd_connection *conn)
605 int ret;
607 ret = ctdbd_connection_destructor(conn);
608 if (ret != 0) {
609 DBG_ERR("ctdbd_connection_destructor failed\n");
610 return ret;
613 ret = ctdbd_init_connection_internal(mem_ctx,
614 sockname,
615 timeout,
616 conn);
617 if (ret != 0) {
618 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
619 strerror(ret));
620 return ret;
623 return 0;
626 int ctdbd_init_async_connection(
627 TALLOC_CTX *mem_ctx,
628 const char *sockname,
629 int timeout,
630 struct ctdbd_connection **pconn)
632 struct ctdbd_connection *conn = NULL;
633 int ret;
635 *pconn = NULL;
637 ret = ctdbd_init_connection(mem_ctx, sockname, timeout, &conn);
638 if (ret != 0) {
639 return ret;
642 ret = set_blocking(conn->fd, false);
643 if (ret == -1) {
644 int err = errno;
645 SMB_ASSERT(err != 0);
646 TALLOC_FREE(conn);
647 return err;
650 conn->outgoing = tevent_queue_create(conn, "ctdb async outgoing");
651 if (conn->outgoing == NULL) {
652 TALLOC_FREE(conn);
653 return ENOMEM;
656 *pconn = conn;
657 return 0;
660 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
662 return conn->fd;
666 * Packet handler to receive and handle a ctdb message
668 static int ctdb_handle_message(struct tevent_context *ev,
669 struct ctdbd_connection *conn,
670 struct ctdb_req_header *hdr)
672 struct ctdb_req_message_old *msg;
674 if (hdr->operation != CTDB_REQ_MESSAGE) {
675 DEBUG(0, ("Received async msg of type %u, discarding\n",
676 hdr->operation));
677 return EINVAL;
680 msg = (struct ctdb_req_message_old *)hdr;
682 ctdbd_msg_call_back(ev, conn, msg);
684 return 0;
687 void ctdbd_socket_readable(struct tevent_context *ev,
688 struct ctdbd_connection *conn)
690 struct ctdb_req_header *hdr = NULL;
691 int ret;
693 ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
694 if (ret != 0) {
695 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
696 cluster_fatal("failed to read data from ctdbd\n");
698 SMB_ASSERT(hdr != NULL);
700 ret = ctdb_handle_message(ev, conn, hdr);
702 TALLOC_FREE(hdr);
704 if (ret != 0) {
705 DEBUG(10, ("could not handle incoming message: %s\n",
706 strerror(ret)));
710 int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
711 uint32_t dst_vnn, uint64_t dst_srvid,
712 const struct iovec *iov, int iovlen)
714 struct ctdb_req_message_old r;
715 struct iovec iov2[iovlen+1];
716 size_t buflen = iov_buflen(iov, iovlen);
717 ssize_t nwritten;
719 r.hdr.length = offsetof(struct ctdb_req_message_old, data) + buflen;
720 r.hdr.ctdb_magic = CTDB_MAGIC;
721 r.hdr.ctdb_version = CTDB_PROTOCOL;
722 r.hdr.generation = 1;
723 r.hdr.operation = CTDB_REQ_MESSAGE;
724 r.hdr.destnode = dst_vnn;
725 r.hdr.srcnode = conn->our_vnn;
726 r.hdr.reqid = 0;
727 r.srvid = dst_srvid;
728 r.datalen = buflen;
730 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
731 ctdb_packet_dump(&r.hdr);
733 iov2[0].iov_base = &r;
734 iov2[0].iov_len = offsetof(struct ctdb_req_message_old, data);
735 memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
737 nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
738 if (nwritten == -1) {
739 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
740 cluster_fatal("cluster dispatch daemon msg write error\n");
743 return 0;
747 * send/recv a generic ctdb control message
749 static int ctdbd_control(struct ctdbd_connection *conn,
750 uint32_t vnn, uint32_t opcode,
751 uint64_t srvid, uint32_t flags,
752 TDB_DATA data,
753 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
754 int32_t *cstatus)
756 struct ctdb_req_control_old req;
757 struct ctdb_req_header *hdr;
758 struct ctdb_reply_control_old *reply = NULL;
759 struct iovec iov[2];
760 ssize_t nwritten;
761 int ret;
763 if (ctdbd_conn_has_async_reqs(conn)) {
765 * Can't use sync call while an async call is in flight. Adding
766 * this check as a safety net. We'll be using different
767 * connections for sync and async requests, so this shouldn't
768 * happen, but who knows...
770 DBG_ERR("Async ctdb req on sync connection\n");
771 return EINVAL;
774 ZERO_STRUCT(req);
775 req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
776 req.hdr.ctdb_magic = CTDB_MAGIC;
777 req.hdr.ctdb_version = CTDB_PROTOCOL;
778 req.hdr.operation = CTDB_REQ_CONTROL;
779 req.hdr.reqid = ctdbd_next_reqid(conn);
780 req.hdr.destnode = vnn;
781 req.opcode = opcode;
782 req.srvid = srvid;
783 req.datalen = data.dsize;
784 req.flags = flags;
786 DBG_DEBUG("Sending ctdb packet reqid=%"PRIu32", vnn=%"PRIu32", "
787 "opcode=%"PRIu32", srvid=%"PRIu64"\n", req.hdr.reqid,
788 req.hdr.destnode, req.opcode, req.srvid);
789 ctdb_packet_dump(&req.hdr);
791 iov[0].iov_base = &req;
792 iov[0].iov_len = offsetof(struct ctdb_req_control_old, data);
793 iov[1].iov_base = data.dptr;
794 iov[1].iov_len = data.dsize;
796 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
797 if (nwritten == -1) {
798 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
799 cluster_fatal("cluster dispatch daemon msg write error\n");
802 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
803 if (cstatus) {
804 *cstatus = 0;
806 return 0;
809 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
810 if (ret != 0) {
811 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
812 return ret;
815 if (hdr->operation != CTDB_REPLY_CONTROL) {
816 DEBUG(0, ("received invalid reply\n"));
817 TALLOC_FREE(hdr);
818 return EIO;
820 reply = (struct ctdb_reply_control_old *)hdr;
822 if (outdata) {
823 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
824 mem_ctx, reply->data, reply->datalen))) {
825 TALLOC_FREE(reply);
826 return ENOMEM;
828 outdata->dsize = reply->datalen;
830 if (cstatus) {
831 (*cstatus) = reply->status;
834 TALLOC_FREE(reply);
835 return ret;
839 * see if a remote process exists
841 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn,
842 pid_t pid, uint64_t unique_id)
844 uint8_t buf[sizeof(pid)+sizeof(unique_id)];
845 int32_t cstatus = 0;
846 int ret;
848 if (unique_id == SERVERID_UNIQUE_ID_NOT_TO_VERIFY) {
849 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS,
850 0, 0,
851 (TDB_DATA) { .dptr = (uint8_t *)&pid,
852 .dsize = sizeof(pid) },
853 NULL, NULL, &cstatus);
854 if (ret != 0) {
855 return false;
857 return (cstatus == 0);
860 memcpy(buf, &pid, sizeof(pid));
861 memcpy(buf+sizeof(pid), &unique_id, sizeof(unique_id));
863 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_CHECK_PID_SRVID, 0, 0,
864 (TDB_DATA) { .dptr = buf, .dsize = sizeof(buf) },
865 NULL, NULL, &cstatus);
866 if (ret != 0) {
867 return false;
869 return (cstatus == 0);
873 * Get a db path
875 char *ctdbd_dbpath(struct ctdbd_connection *conn,
876 TALLOC_CTX *mem_ctx, uint32_t db_id)
878 int ret;
879 TDB_DATA data;
880 TDB_DATA rdata = {0};
881 int32_t cstatus = 0;
883 data.dptr = (uint8_t*)&db_id;
884 data.dsize = sizeof(db_id);
886 ret = ctdbd_control_local(conn, CTDB_CONTROL_GETDBPATH, 0, 0, data,
887 mem_ctx, &rdata, &cstatus);
888 if ((ret != 0) || cstatus != 0) {
889 DEBUG(0, (__location__ " ctdb_control for getdbpath failed: %s\n",
890 strerror(ret)));
891 TALLOC_FREE(rdata.dptr);
894 return (char *)rdata.dptr;
898 * attach to a ctdb database
900 int ctdbd_db_attach(struct ctdbd_connection *conn,
901 const char *name, uint32_t *db_id, bool persistent)
903 int ret;
904 TDB_DATA data = {0};
905 int32_t cstatus;
907 data = string_term_tdb_data(name);
909 ret = ctdbd_control_local(conn,
910 persistent
911 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
912 : CTDB_CONTROL_DB_ATTACH,
913 0, 0, data, NULL, &data, &cstatus);
914 if (ret != 0) {
915 DEBUG(0, (__location__ " ctdb_control for db_attach "
916 "failed: %s\n", strerror(ret)));
917 return ret;
920 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
921 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
922 TALLOC_FREE(data.dptr);
923 return EIO;
926 *db_id = *(uint32_t *)data.dptr;
927 talloc_free(data.dptr);
929 return 0;
933 * force the migration of a record to this node
935 int ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key)
937 struct ctdb_req_call_old req;
938 struct ctdb_req_header *hdr = NULL;
939 struct iovec iov[2];
940 ssize_t nwritten;
941 int ret;
943 if (ctdbd_conn_has_async_reqs(conn)) {
945 * Can't use sync call while an async call is in flight. Adding
946 * this check as a safety net. We'll be using different
947 * connections for sync and async requests, so this shouldn't
948 * happen, but who knows...
950 DBG_ERR("Async ctdb req on sync connection\n");
951 return EINVAL;
954 ZERO_STRUCT(req);
956 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
957 req.hdr.ctdb_magic = CTDB_MAGIC;
958 req.hdr.ctdb_version = CTDB_PROTOCOL;
959 req.hdr.operation = CTDB_REQ_CALL;
960 req.hdr.reqid = ctdbd_next_reqid(conn);
961 req.flags = CTDB_IMMEDIATE_MIGRATION;
962 req.callid = CTDB_NULL_FUNC;
963 req.db_id = db_id;
964 req.keylen = key.dsize;
966 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
967 ctdb_packet_dump(&req.hdr);
969 iov[0].iov_base = &req;
970 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
971 iov[1].iov_base = key.dptr;
972 iov[1].iov_len = key.dsize;
974 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
975 if (nwritten == -1) {
976 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
977 cluster_fatal("cluster dispatch daemon msg write error\n");
980 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
981 if (ret != 0) {
982 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
983 goto fail;
986 if (hdr->operation != CTDB_REPLY_CALL) {
987 if (hdr->operation == CTDB_REPLY_ERROR) {
988 DBG_ERR("received error from ctdb\n");
989 } else {
990 DBG_ERR("received invalid reply\n");
992 ret = EIO;
993 goto fail;
996 fail:
998 TALLOC_FREE(hdr);
999 return ret;
1003 * Fetch a record and parse it
1005 int ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
1006 TDB_DATA key, bool local_copy,
1007 void (*parser)(TDB_DATA key, TDB_DATA data,
1008 void *private_data),
1009 void *private_data)
1011 struct ctdb_req_call_old req;
1012 struct ctdb_req_header *hdr = NULL;
1013 struct ctdb_reply_call_old *reply;
1014 struct iovec iov[2];
1015 ssize_t nwritten;
1016 uint32_t flags;
1017 int ret;
1019 if (ctdbd_conn_has_async_reqs(conn)) {
1021 * Can't use sync call while an async call is in flight. Adding
1022 * this check as a safety net. We'll be using different
1023 * connections for sync and async requests, so this shouldn't
1024 * happen, but who knows...
1026 DBG_ERR("Async ctdb req on sync connection\n");
1027 return EINVAL;
1030 flags = local_copy ? CTDB_WANT_READONLY : 0;
1032 ZERO_STRUCT(req);
1034 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
1035 req.hdr.ctdb_magic = CTDB_MAGIC;
1036 req.hdr.ctdb_version = CTDB_PROTOCOL;
1037 req.hdr.operation = CTDB_REQ_CALL;
1038 req.hdr.reqid = ctdbd_next_reqid(conn);
1039 req.flags = flags;
1040 req.callid = CTDB_FETCH_FUNC;
1041 req.db_id = db_id;
1042 req.keylen = key.dsize;
1044 iov[0].iov_base = &req;
1045 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
1046 iov[1].iov_base = key.dptr;
1047 iov[1].iov_len = key.dsize;
1049 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1050 if (nwritten == -1) {
1051 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
1052 cluster_fatal("cluster dispatch daemon msg write error\n");
1055 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
1056 if (ret != 0) {
1057 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
1058 goto fail;
1061 if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
1062 DEBUG(0, ("received invalid reply\n"));
1063 ret = EIO;
1064 goto fail;
1066 reply = (struct ctdb_reply_call_old *)hdr;
1068 if (reply->datalen == 0) {
1070 * Treat an empty record as non-existing
1072 ret = ENOENT;
1073 goto fail;
1076 parser(key, make_tdb_data(&reply->data[0], reply->datalen),
1077 private_data);
1079 ret = 0;
1080 fail:
1081 TALLOC_FREE(hdr);
1082 return ret;
1086 Traverse a ctdb database. "conn" must be an otherwise unused
1087 ctdb_connection where no other messages but the traverse ones are
1088 expected.
1091 int ctdbd_traverse(struct ctdbd_connection *conn, uint32_t db_id,
1092 void (*fn)(TDB_DATA key, TDB_DATA data,
1093 void *private_data),
1094 void *private_data)
1096 int ret;
1097 TDB_DATA key, data;
1098 struct ctdb_traverse_start t;
1099 int32_t cstatus = 0;
1101 if (ctdbd_conn_has_async_reqs(conn)) {
1103 * Can't use sync call while an async call is in flight. Adding
1104 * this check as a safety net. We'll be using different
1105 * connections for sync and async requests, so this shouldn't
1106 * happen, but who knows...
1108 DBG_ERR("Async ctdb req on sync connection\n");
1109 return EINVAL;
1112 t.db_id = db_id;
1113 t.srvid = conn->rand_srvid;
1114 t.reqid = ctdbd_next_reqid(conn);
1116 data.dptr = (uint8_t *)&t;
1117 data.dsize = sizeof(t);
1119 ret = ctdbd_control_local(conn, CTDB_CONTROL_TRAVERSE_START,
1120 conn->rand_srvid,
1121 0, data, NULL, NULL, &cstatus);
1123 if ((ret != 0) || (cstatus != 0)) {
1124 DEBUG(0,("ctdbd_control failed: %s, %d\n", strerror(ret),
1125 cstatus));
1127 if (ret == 0) {
1129 * We need a mapping here
1131 ret = EIO;
1133 return ret;
1136 while (true) {
1137 struct ctdb_req_header *hdr = NULL;
1138 struct ctdb_req_message_old *m;
1139 struct ctdb_rec_data_old *d;
1141 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1142 if (ret != 0) {
1143 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
1144 cluster_fatal("failed to read data from ctdbd\n");
1146 SMB_ASSERT(hdr != NULL);
1148 if (hdr->operation != CTDB_REQ_MESSAGE) {
1149 DEBUG(0, ("Got operation %u, expected a message\n",
1150 (unsigned)hdr->operation));
1151 return EIO;
1154 m = (struct ctdb_req_message_old *)hdr;
1155 d = (struct ctdb_rec_data_old *)&m->data[0];
1156 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1157 DEBUG(0, ("Got invalid traverse data of length %d\n",
1158 (int)m->datalen));
1159 return EIO;
1162 key.dsize = d->keylen;
1163 key.dptr = &d->data[0];
1164 data.dsize = d->datalen;
1165 data.dptr = &d->data[d->keylen];
1167 if (key.dsize == 0 && data.dsize == 0) {
1168 /* end of traverse */
1169 return 0;
1172 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1173 DEBUG(0, ("Got invalid ltdb header length %d\n",
1174 (int)data.dsize));
1175 return EIO;
1177 data.dsize -= sizeof(struct ctdb_ltdb_header);
1178 data.dptr += sizeof(struct ctdb_ltdb_header);
1180 if (fn != NULL) {
1181 fn(key, data, private_data);
1184 return 0;
1188 This is used to canonicalize a ctdb_sock_addr structure.
1190 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1191 struct sockaddr_storage *out)
1193 memcpy(out, in, sizeof (*out));
1195 #ifdef HAVE_IPV6
1196 if (in->ss_family == AF_INET6) {
1197 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1198 const struct sockaddr_in6 *in6 =
1199 (const struct sockaddr_in6 *)in;
1200 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1201 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1202 memset(out, 0, sizeof(*out));
1203 #ifdef HAVE_SOCK_SIN_LEN
1204 out4->sin_len = sizeof(*out);
1205 #endif
1206 out4->sin_family = AF_INET;
1207 out4->sin_port = in6->sin6_port;
1208 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1211 #endif
1215 * Register us as a server for a particular tcp connection
1218 int ctdbd_register_ips(struct ctdbd_connection *conn,
1219 const struct sockaddr_storage *_server,
1220 const struct sockaddr_storage *_client,
1221 int (*cb)(struct tevent_context *ev,
1222 uint32_t src_vnn, uint32_t dst_vnn,
1223 uint64_t dst_srvid,
1224 const uint8_t *msg, size_t msglen,
1225 void *private_data),
1226 void *private_data)
1228 struct ctdb_connection p;
1229 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1230 int ret;
1231 struct sockaddr_storage client;
1232 struct sockaddr_storage server;
1235 * Only one connection so far
1238 smbd_ctdb_canonicalize_ip(_client, &client);
1239 smbd_ctdb_canonicalize_ip(_server, &server);
1241 ZERO_STRUCT(p);
1242 switch (client.ss_family) {
1243 case AF_INET:
1244 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1245 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1246 break;
1247 case AF_INET6:
1248 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1249 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1250 break;
1251 default:
1252 return EIO;
1256 * We want to be told about IP releases
1259 ret = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1260 cb, private_data);
1261 if (ret != 0) {
1262 return ret;
1266 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1267 * can send an extra ack to trigger a reset for our client, so it
1268 * immediately reconnects
1270 ret = ctdbd_control_local(conn,
1271 CTDB_CONTROL_TCP_CLIENT, 0,
1272 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1273 NULL);
1274 if (ret != 0) {
1275 return ret;
1277 return 0;
1280 void ctdbd_unregister_ips(struct ctdbd_connection *conn,
1281 const struct sockaddr_storage *_server,
1282 const struct sockaddr_storage *_client,
1283 int (*cb)(struct tevent_context *ev,
1284 uint32_t src_vnn,
1285 uint32_t dst_vnn,
1286 uint64_t dst_srvid,
1287 const uint8_t *msg,
1288 size_t msglen,
1289 void *private_data),
1290 void *private_data)
1292 struct ctdb_connection p;
1293 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1294 int ret;
1295 struct sockaddr_storage client;
1296 struct sockaddr_storage server;
1299 * Only one connection so far
1302 smbd_ctdb_canonicalize_ip(_client, &client);
1303 smbd_ctdb_canonicalize_ip(_server, &server);
1305 ZERO_STRUCT(p);
1306 switch (client.ss_family) {
1307 case AF_INET:
1308 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1309 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1310 break;
1311 case AF_INET6:
1312 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1313 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1314 break;
1315 default:
1316 return;
1320 * We no longer want to be told about IP releases
1321 * for the given callback/private_data combination
1323 deregister_from_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1324 cb, private_data);
1327 * inform ctdb of our tcp connection is no longer active
1329 ret = ctdbd_control_local(conn,
1330 CTDB_CONTROL_TCP_CLIENT_DISCONNECTED, 0,
1331 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1332 NULL);
1333 if (ret != 0) {
1335 * We ignore errors here, as we'll just
1336 * no longer have a callback handler
1337 * registered and messages may just be ignored
1341 return;
1344 void ctdbd_passed_ips(struct ctdbd_connection *conn,
1345 const struct sockaddr_storage *_server,
1346 const struct sockaddr_storage *_client,
1347 int (*cb)(struct tevent_context *ev,
1348 uint32_t src_vnn,
1349 uint32_t dst_vnn,
1350 uint64_t dst_srvid,
1351 const uint8_t *msg,
1352 size_t msglen,
1353 void *private_data),
1354 void *private_data)
1356 struct ctdb_connection p;
1357 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1358 int ret;
1359 struct sockaddr_storage client;
1360 struct sockaddr_storage server;
1363 * Only one connection so far
1366 smbd_ctdb_canonicalize_ip(_client, &client);
1367 smbd_ctdb_canonicalize_ip(_server, &server);
1369 ZERO_STRUCT(p);
1370 switch (client.ss_family) {
1371 case AF_INET:
1372 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1373 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1374 break;
1375 case AF_INET6:
1376 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1377 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1378 break;
1379 default:
1380 return;
1384 * We no longer want to be told about IP releases
1385 * for the given callback/private_data combination
1387 deregister_from_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1388 cb, private_data);
1391 * inform ctdb of our tcp connection is now passed to
1392 * another process.
1394 ret = ctdbd_control_local(conn,
1395 CTDB_CONTROL_TCP_CLIENT_PASSED, 0,
1396 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1397 NULL);
1398 if (ret != 0) {
1400 * We ignore errors here, as we'll just
1401 * no longer have a callback handler
1402 * registered and messages may just be ignored
1406 return;
1409 static int ctdbd_control_get_public_ips(struct ctdbd_connection *conn,
1410 uint32_t vnn,
1411 uint32_t flags,
1412 TALLOC_CTX *mem_ctx,
1413 struct ctdb_public_ip_list_old **_ips)
1415 struct ctdb_public_ip_list_old *ips = NULL;
1416 TDB_DATA outdata;
1417 int32_t cstatus = -1;
1418 size_t min_dsize;
1419 size_t max_ips;
1420 int ret;
1422 *_ips = NULL;
1424 ret = ctdbd_control(conn,
1425 vnn,
1426 CTDB_CONTROL_GET_PUBLIC_IPS,
1427 0, /* srvid */
1428 flags,
1429 tdb_null, /* indata */
1430 mem_ctx,
1431 &outdata,
1432 &cstatus);
1433 if (ret != 0 || cstatus != 0) {
1434 DBG_ERR("ctdb_control for getpublicips failed ret:%d cstatus:%d\n",
1435 ret, (int)cstatus);
1436 return -1;
1439 min_dsize = offsetof(struct ctdb_public_ip_list_old, ips);
1440 if (outdata.dsize < min_dsize) {
1441 DBG_ERR("outdata.dsize=%zu < min_dsize=%zu\n",
1442 outdata.dsize, min_dsize);
1443 return -1;
1445 max_ips = (outdata.dsize - min_dsize)/sizeof(struct ctdb_public_ip);
1446 ips = (struct ctdb_public_ip_list_old *)outdata.dptr;
1447 if ((size_t)ips->num > max_ips) {
1448 DBG_ERR("ips->num=%zu > max_ips=%zu\n",
1449 (size_t)ips->num, max_ips);
1450 return -1;
1453 *_ips = ips;
1454 return 0;
1457 static struct samba_sockaddr ctdbd_sock_addr_to_samba(const ctdb_sock_addr *c)
1459 struct samba_sockaddr s = {};
1461 switch (c->sa.sa_family) {
1462 case AF_INET:
1463 s.u.in = c->ip;
1464 break;
1465 case AF_INET6:
1467 * ctdb always requires HAVE_IPV6,
1468 * so we don't need an ifdef here.
1470 s.u.in6 = c->ip6;
1471 break;
1472 default:
1474 * ctdb_sock_addr only supports ipv4 and ipv6
1476 smb_panic(__location__);
1477 break;
1480 return s;
1483 int ctdbd_public_ip_foreach(struct ctdbd_connection *conn,
1484 int (*cb)(uint32_t total_ip_count,
1485 const struct sockaddr_storage *ip,
1486 bool is_movable_ip,
1487 void *private_data),
1488 void *private_data)
1490 uint32_t i;
1491 struct ctdb_public_ip_list_old *ips = NULL;
1492 int ret = ENOMEM;
1493 TALLOC_CTX *frame = talloc_stackframe();
1495 ret = ctdbd_control_get_public_ips(conn, CTDB_CURRENT_NODE, 0, frame, &ips);
1496 if (ret < 0) {
1497 ret = EIO;
1498 goto out_free;
1501 for (i=0; i < ips->num; i++) {
1502 const ctdb_sock_addr *addr = &ips->ips[i].addr;
1503 struct samba_sockaddr tmp = ctdbd_sock_addr_to_samba(addr);
1505 ret = cb(ips->num,
1506 &tmp.u.ss,
1507 true, /* all ctdb public ips are movable */
1508 private_data);
1509 if (ret != 0) {
1510 goto out_free;
1514 ret = 0;
1515 out_free:
1516 TALLOC_FREE(frame);
1517 return ret;
1521 call a control on the local node
1523 int ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1524 uint64_t srvid, uint32_t flags, TDB_DATA data,
1525 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1526 int32_t *cstatus)
1528 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data,
1529 mem_ctx, outdata, cstatus);
1532 int ctdb_watch_us(struct ctdbd_connection *conn)
1534 struct ctdb_notify_data_old reg_data;
1535 size_t struct_len;
1536 int ret;
1537 int32_t cstatus;
1539 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1540 reg_data.len = 1;
1541 reg_data.notify_data[0] = 0;
1543 struct_len = offsetof(struct ctdb_notify_data_old,
1544 notify_data) + reg_data.len;
1546 ret = ctdbd_control_local(
1547 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1548 make_tdb_data((uint8_t *)&reg_data, struct_len),
1549 NULL, NULL, &cstatus);
1550 if (ret != 0) {
1551 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1552 strerror(ret)));
1554 return ret;
1557 int ctdb_unwatch(struct ctdbd_connection *conn)
1559 uint64_t srvid = CTDB_SRVID_SAMBA_NOTIFY;
1560 int ret;
1561 int32_t cstatus;
1563 ret = ctdbd_control_local(
1564 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1565 make_tdb_data((uint8_t *)&srvid, sizeof(srvid)),
1566 NULL, NULL, &cstatus);
1567 if (ret != 0) {
1568 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1569 strerror(ret)));
1571 return ret;
1574 int ctdbd_probe(const char *sockname, int timeout)
1577 * Do a very early check if ctdbd is around to avoid an abort and core
1578 * later
1580 struct ctdbd_connection *conn = NULL;
1581 int ret;
1583 ret = ctdbd_init_connection(talloc_tos(), sockname, timeout,
1584 &conn);
1587 * We only care if we can connect.
1589 TALLOC_FREE(conn);
1591 return ret;
1594 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
1596 if (c->fd != -1) {
1597 close(c->fd);
1598 c->fd = -1;
1600 return 0;
1603 void ctdbd_prep_hdr_next_reqid(
1604 struct ctdbd_connection *conn, struct ctdb_req_header *hdr)
1606 *hdr = (struct ctdb_req_header) {
1607 .ctdb_magic = CTDB_MAGIC,
1608 .ctdb_version = CTDB_PROTOCOL,
1609 .reqid = ctdbd_next_reqid(conn),
1610 .destnode = CTDB_CURRENT_NODE,
1614 struct ctdbd_pkt_read_state {
1615 uint8_t *pkt;
1618 static ssize_t ctdbd_pkt_read_more(
1619 uint8_t *buf, size_t buflen, void *private_data);
1620 static void ctdbd_pkt_read_done(struct tevent_req *subreq);
1622 static struct tevent_req *ctdbd_pkt_read_send(
1623 TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd)
1625 struct tevent_req *req = NULL, *subreq = NULL;
1626 struct ctdbd_pkt_read_state *state = NULL;
1628 req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state);
1629 if (req == NULL) {
1630 return NULL;
1632 subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL);
1633 if (tevent_req_nomem(subreq, req)) {
1634 return tevent_req_post(req, ev);
1636 tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req);
1637 return req;
1640 static ssize_t ctdbd_pkt_read_more(
1641 uint8_t *buf, size_t buflen, void *private_data)
1643 uint32_t msglen;
1644 if (buflen < 4) {
1645 return -1;
1647 if (buflen > 4) {
1648 return 0; /* Been here, done */
1650 memcpy(&msglen, buf, 4);
1652 if (msglen < sizeof(struct ctdb_req_header)) {
1653 return -1;
1655 return msglen - sizeof(msglen);
1658 static void ctdbd_pkt_read_done(struct tevent_req *subreq)
1660 struct tevent_req *req = tevent_req_callback_data(
1661 subreq, struct tevent_req);
1662 struct ctdbd_pkt_read_state *state = tevent_req_data(
1663 req, struct ctdbd_pkt_read_state);
1664 ssize_t nread;
1665 int err;
1667 nread = read_packet_recv(subreq, state, &state->pkt, &err);
1668 TALLOC_FREE(subreq);
1669 if (nread == -1) {
1670 tevent_req_error(req, err);
1671 return;
1673 tevent_req_done(req);
1676 static int ctdbd_pkt_read_recv(
1677 struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt)
1679 struct ctdbd_pkt_read_state *state = tevent_req_data(
1680 req, struct ctdbd_pkt_read_state);
1681 int err;
1683 if (tevent_req_is_unix_error(req, &err)) {
1684 return err;
1686 *pkt = talloc_move(mem_ctx, &state->pkt);
1687 tevent_req_received(req);
1688 return 0;
1691 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn);
1692 static void ctdbd_conn_received(struct tevent_req *subreq);
1694 struct ctdbd_req_state {
1695 struct ctdbd_connection *conn;
1696 struct tevent_context *ev;
1697 uint32_t reqid;
1698 struct ctdb_req_header *reply;
1701 static void ctdbd_req_unset_pending(struct tevent_req *req)
1703 struct ctdbd_req_state *state = tevent_req_data(
1704 req, struct ctdbd_req_state);
1705 struct ctdbd_connection *conn = state->conn;
1706 size_t num_pending = talloc_array_length(conn->pending);
1707 size_t i, num_after;
1709 tevent_req_set_cleanup_fn(req, NULL);
1711 if (num_pending == 1) {
1713 * conn->read_req is a child of conn->pending
1715 TALLOC_FREE(conn->pending);
1716 conn->read_req = NULL;
1717 return;
1720 for (i=0; i<num_pending; i++) {
1721 if (req == conn->pending[i]) {
1722 break;
1725 if (i == num_pending) {
1727 * Something's seriously broken. Just returning here is the
1728 * right thing nevertheless, the point of this routine is to
1729 * remove ourselves from conn->pending.
1731 return;
1734 num_after = num_pending - i - 1;
1735 if (num_after > 0) {
1736 memmove(&conn->pending[i],
1737 &conn->pending[i] + 1,
1738 sizeof(*conn->pending) * num_after);
1740 conn->pending = talloc_realloc(
1741 NULL, conn->pending, struct tevent_req *, num_pending - 1);
1744 static void ctdbd_req_cleanup(
1745 struct tevent_req *req, enum tevent_req_state req_state)
1747 ctdbd_req_unset_pending(req);
1750 static bool ctdbd_req_set_pending(struct tevent_req *req)
1752 struct ctdbd_req_state *state = tevent_req_data(
1753 req, struct ctdbd_req_state);
1754 struct ctdbd_connection *conn = state->conn;
1755 struct tevent_req **pending = NULL;
1756 size_t num_pending = talloc_array_length(conn->pending);
1757 bool ok;
1759 pending = talloc_realloc(
1760 conn, conn->pending, struct tevent_req *, num_pending + 1);
1761 if (pending == NULL) {
1762 return false;
1764 pending[num_pending] = req;
1765 conn->pending = pending;
1767 tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup);
1769 ok = ctdbd_conn_receive_next(conn);
1770 if (!ok) {
1771 ctdbd_req_unset_pending(req);
1772 return false;
1775 return true;
1778 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn)
1780 size_t num_pending = talloc_array_length(conn->pending);
1781 struct tevent_req *req = NULL;
1782 struct ctdbd_req_state *state = NULL;
1784 if (conn->read_req != NULL) {
1785 return true;
1787 if (num_pending == 0) {
1789 * done for now
1791 return true;
1794 req = conn->pending[0];
1795 state = tevent_req_data(req, struct ctdbd_req_state);
1797 conn->read_req = ctdbd_pkt_read_send(
1798 conn->pending, state->ev, conn->fd);
1799 if (conn->read_req == NULL) {
1800 return false;
1802 tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn);
1803 return true;
1806 static void ctdbd_conn_received(struct tevent_req *subreq)
1808 struct ctdbd_connection *conn = tevent_req_callback_data(
1809 subreq, struct ctdbd_connection);
1810 TALLOC_CTX *frame = talloc_stackframe();
1811 uint8_t *pkt = NULL;
1812 int ret;
1813 struct ctdb_req_header *hdr = NULL;
1814 uint32_t reqid;
1815 struct tevent_req *req = NULL;
1816 struct ctdbd_req_state *state = NULL;
1817 size_t i, num_pending;
1818 bool ok;
1820 SMB_ASSERT(subreq == conn->read_req);
1821 conn->read_req = NULL;
1823 ret = ctdbd_pkt_read_recv(subreq, frame, &pkt);
1824 TALLOC_FREE(subreq);
1825 if (ret != 0) {
1826 cluster_fatal("ctdbd_pkt_read failed\n");
1829 hdr = (struct ctdb_req_header *)pkt;
1830 reqid = hdr->reqid;
1831 num_pending = talloc_array_length(conn->pending);
1833 for (i=0; i<num_pending; i++) {
1834 req = conn->pending[i];
1835 state = tevent_req_data(req, struct ctdbd_req_state);
1836 if (state->reqid == reqid) {
1837 break;
1841 if (i == num_pending) {
1842 /* not found */
1843 TALLOC_FREE(frame);
1844 return;
1847 state->reply = talloc_move(state, &hdr);
1848 tevent_req_defer_callback(req, state->ev);
1849 tevent_req_done(req);
1851 TALLOC_FREE(frame);
1853 ok = ctdbd_conn_receive_next(conn);
1854 if (!ok) {
1855 cluster_fatal("ctdbd_conn_receive_next failed\n");
1859 static void ctdbd_req_written(struct tevent_req *subreq);
1861 struct tevent_req *ctdbd_req_send(
1862 TALLOC_CTX *mem_ctx,
1863 struct tevent_context *ev,
1864 struct ctdbd_connection *conn,
1865 struct iovec *iov,
1866 size_t num_iov)
1868 struct tevent_req *req = NULL, *subreq = NULL;
1869 struct ctdbd_req_state *state = NULL;
1870 struct ctdb_req_header *hdr = NULL;
1871 bool ok;
1873 req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state);
1874 if (req == NULL) {
1875 return NULL;
1877 state->conn = conn;
1878 state->ev = ev;
1880 if ((num_iov == 0) ||
1881 (iov[0].iov_len < sizeof(struct ctdb_req_header))) {
1882 tevent_req_error(req, EINVAL);
1883 return tevent_req_post(req, ev);
1885 hdr = iov[0].iov_base;
1886 state->reqid = hdr->reqid;
1888 ok = ctdbd_req_set_pending(req);
1889 if (!ok) {
1890 tevent_req_oom(req);
1891 return tevent_req_post(req, ev);
1894 subreq = writev_send(
1895 state, ev, conn->outgoing, conn->fd, false, iov, num_iov);
1896 if (tevent_req_nomem(subreq, req)) {
1897 return tevent_req_post(req, ev);
1899 tevent_req_set_callback(subreq, ctdbd_req_written, req);
1901 return req;
1904 static void ctdbd_req_written(struct tevent_req *subreq)
1906 struct tevent_req *req = tevent_req_callback_data(
1907 subreq, struct tevent_req);
1908 ssize_t nwritten;
1909 int err;
1911 nwritten = writev_recv(subreq, &err);
1912 TALLOC_FREE(subreq);
1913 if (nwritten == -1) {
1914 tevent_req_error(req, err);
1915 return;
1919 int ctdbd_req_recv(
1920 struct tevent_req *req,
1921 TALLOC_CTX *mem_ctx,
1922 struct ctdb_req_header **reply)
1924 struct ctdbd_req_state *state = tevent_req_data(
1925 req, struct ctdbd_req_state);
1926 int err;
1928 if (tevent_req_is_unix_error(req, &err)) {
1929 return err;
1931 *reply = talloc_move(mem_ctx, &state->reply);
1932 tevent_req_received(req);
1933 return 0;
1936 struct ctdbd_parse_state {
1937 struct tevent_context *ev;
1938 struct ctdbd_connection *conn;
1939 uint32_t reqid;
1940 TDB_DATA key;
1941 uint8_t _keybuf[64];
1942 struct ctdb_req_call_old ctdb_req;
1943 struct iovec iov[2];
1944 void (*parser)(TDB_DATA key,
1945 TDB_DATA data,
1946 void *private_data);
1947 void *private_data;
1950 static void ctdbd_parse_done(struct tevent_req *subreq);
1952 struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
1953 struct tevent_context *ev,
1954 struct ctdbd_connection *conn,
1955 uint32_t db_id,
1956 TDB_DATA key,
1957 bool local_copy,
1958 void (*parser)(TDB_DATA key,
1959 TDB_DATA data,
1960 void *private_data),
1961 void *private_data,
1962 enum dbwrap_req_state *req_state)
1964 struct tevent_req *req = NULL;
1965 struct ctdbd_parse_state *state = NULL;
1966 uint32_t flags;
1967 uint32_t packet_length;
1968 struct tevent_req *subreq = NULL;
1970 req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
1971 if (req == NULL) {
1972 *req_state = DBWRAP_REQ_ERROR;
1973 return NULL;
1976 *req_state = DBWRAP_REQ_DISPATCHED;
1978 *state = (struct ctdbd_parse_state) {
1979 .ev = ev,
1980 .conn = conn,
1981 .reqid = ctdbd_next_reqid(conn),
1982 .parser = parser,
1983 .private_data = private_data,
1986 flags = local_copy ? CTDB_WANT_READONLY : 0;
1987 packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
1990 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
1991 * all passed iov elements have a lifetime longer that the tevent_req
1992 * returned by ctdb_pkt_send_send(). This is required continue sending a
1993 * the low level request into the ctdb socket, if a higher level
1994 * ('this') request is canceled (or talloc free'd) by the application
1995 * layer, without sending invalid packets to ctdb.
1997 if (key.dsize > sizeof(state->_keybuf)) {
1998 state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
1999 if (tevent_req_nomem(state->key.dptr, req)) {
2000 return tevent_req_post(req, ev);
2002 } else {
2003 memcpy(state->_keybuf, key.dptr, key.dsize);
2004 state->key.dptr = state->_keybuf;
2006 state->key.dsize = key.dsize;
2008 state->ctdb_req.hdr.length = packet_length;
2009 state->ctdb_req.hdr.ctdb_magic = CTDB_MAGIC;
2010 state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
2011 state->ctdb_req.hdr.operation = CTDB_REQ_CALL;
2012 state->ctdb_req.hdr.reqid = state->reqid;
2013 state->ctdb_req.flags = flags;
2014 state->ctdb_req.callid = CTDB_FETCH_FUNC;
2015 state->ctdb_req.db_id = db_id;
2016 state->ctdb_req.keylen = state->key.dsize;
2018 state->iov[0].iov_base = &state->ctdb_req;
2019 state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
2020 state->iov[1].iov_base = state->key.dptr;
2021 state->iov[1].iov_len = state->key.dsize;
2023 subreq = ctdbd_req_send(
2024 state, ev, conn, state->iov, ARRAY_SIZE(state->iov));
2025 if (tevent_req_nomem(subreq, req)) {
2026 *req_state = DBWRAP_REQ_ERROR;
2027 return tevent_req_post(req, ev);
2029 tevent_req_set_callback(subreq, ctdbd_parse_done, req);
2031 return req;
2034 static void ctdbd_parse_done(struct tevent_req *subreq)
2036 struct tevent_req *req = tevent_req_callback_data(
2037 subreq, struct tevent_req);
2038 struct ctdbd_parse_state *state = tevent_req_data(
2039 req, struct ctdbd_parse_state);
2040 struct ctdb_req_header *hdr = NULL;
2041 struct ctdb_reply_call_old *reply = NULL;
2042 int ret;
2044 ret = ctdbd_req_recv(subreq, state, &hdr);
2045 TALLOC_FREE(subreq);
2046 if (tevent_req_error(req, ret)) {
2047 DBG_DEBUG("ctdb_req_recv failed %s\n", strerror(ret));
2048 return;
2050 SMB_ASSERT(hdr != NULL);
2052 if (hdr->operation != CTDB_REPLY_CALL) {
2053 DBG_ERR("received invalid reply\n");
2054 ctdb_packet_dump(hdr);
2055 tevent_req_error(req, EIO);
2056 return;
2059 reply = (struct ctdb_reply_call_old *)hdr;
2061 if (reply->datalen == 0) {
2063 * Treat an empty record as non-existing
2065 tevent_req_error(req, ENOENT);
2066 return;
2069 state->parser(state->key,
2070 make_tdb_data(&reply->data[0], reply->datalen),
2071 state->private_data);
2073 tevent_req_done(req);
2074 return;
2077 int ctdbd_parse_recv(struct tevent_req *req)
2079 return tevent_req_simple_recv_unix(req);