ctdbd_conn: move CTDB_CONTROL_ENABLE_SEQNUM control to db_open_ctdb
[Samba.git] / source3 / lib / ctdbd_conn.c
blob70d3d83f031ef74bb4cb554d615f6ef5193a2568
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "replace.h"
22 #include <tevent.h>
23 #include "util_tdb.h"
24 #include "serverid.h"
25 #include "ctdbd_conn.h"
26 #include "system/select.h"
27 #include "lib/util/sys_rw_data.h"
28 #include "lib/util/iov_buf.h"
29 #include "lib/util/select.h"
30 #include "lib/util/debug.h"
31 #include "lib/util/talloc_stack.h"
32 #include "lib/util/genrand.h"
33 #include "lib/util/fault.h"
34 #include "lib/util/dlinklist.h"
35 #include "lib/util/tevent_unix.c"
36 #include "lib/util/sys_rw.h"
37 #include "lib/util/blocking.h"
38 #include "ctdb/include/ctdb_protocol.h"
40 /* paths to these include files come from --with-ctdb= in configure */
42 struct ctdbd_srvid_cb {
43 uint64_t srvid;
44 int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
45 uint64_t dst_srvid,
46 const uint8_t *msg, size_t msglen,
47 void *private_data);
48 void *private_data;
51 struct ctdb_pkt_send_state;
52 struct ctdb_pkt_recv_state;
54 struct ctdbd_connection {
55 uint32_t reqid;
56 uint32_t our_vnn;
57 uint64_t rand_srvid;
58 struct ctdbd_srvid_cb *callbacks;
59 int fd;
60 int timeout;
62 /* For async connections, enabled via ctdbd_setup_fde() */
63 struct tevent_fd *fde;
65 /* State to track in-progress read */
66 struct ctdb_read_state {
67 /* Receive buffer for the initial packet length */
68 uint32_t msglen;
70 /* iovec state for current read */
71 struct iovec iov;
72 struct iovec *iovs;
73 int iovcnt;
75 /* allocated receive buffer based on packet length */
76 struct ctdb_req_header *hdr;
77 } read_state;
79 /* Lists of pending async reads and writes */
80 struct ctdb_pkt_recv_state *recv_list;
81 struct ctdb_pkt_send_state *send_list;
84 static void ctdbd_async_socket_handler(struct tevent_context *ev,
85 struct tevent_fd *fde,
86 uint16_t flags,
87 void *private_data);
89 static bool ctdbd_conn_has_async_sends(struct ctdbd_connection *conn)
91 return (conn->send_list != NULL);
94 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
96 return (conn->fde != NULL);
99 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
101 conn->reqid += 1;
102 if (conn->reqid == 0) {
103 conn->reqid += 1;
105 return conn->reqid;
108 static int ctdbd_control(struct ctdbd_connection *conn,
109 uint32_t vnn, uint32_t opcode,
110 uint64_t srvid, uint32_t flags,
111 TDB_DATA data,
112 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
113 int32_t *cstatus);
116 * exit on fatal communications errors with the ctdbd daemon
118 static void cluster_fatal(const char *why)
120 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
121 /* we don't use smb_panic() as we don't want to delay to write
122 a core file. We need to release this process id immediately
123 so that someone else can take over without getting sharing
124 violations */
125 _exit(1);
131 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
133 if (DEBUGLEVEL < 11) {
134 return;
136 DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
137 (int)hdr->length, (int)hdr->ctdb_magic,
138 (int)hdr->ctdb_version, (int)hdr->generation,
139 (int)hdr->operation, (int)hdr->reqid));
143 * Register a srvid with ctdbd
145 int register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
146 int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
147 uint64_t dst_srvid,
148 const uint8_t *msg, size_t msglen,
149 void *private_data),
150 void *private_data)
153 int ret;
154 int32_t cstatus;
155 size_t num_callbacks;
156 struct ctdbd_srvid_cb *tmp;
158 ret = ctdbd_control_local(conn, CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
159 tdb_null, NULL, NULL, &cstatus);
160 if (ret != 0) {
161 return ret;
164 num_callbacks = talloc_array_length(conn->callbacks);
166 tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
167 num_callbacks + 1);
168 if (tmp == NULL) {
169 return ENOMEM;
171 conn->callbacks = tmp;
173 conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
174 .srvid = srvid, .cb = cb, .private_data = private_data
177 return 0;
180 static int ctdbd_msg_call_back(struct ctdbd_connection *conn,
181 struct ctdb_req_message_old *msg)
183 uint32_t msg_len;
184 size_t i, num_callbacks;
186 msg_len = msg->hdr.length;
187 if (msg_len < offsetof(struct ctdb_req_message_old, data)) {
188 DBG_DEBUG("len %"PRIu32" too small\n", msg_len);
189 return 0;
191 msg_len -= offsetof(struct ctdb_req_message_old, data);
193 if (msg_len < msg->datalen) {
194 DBG_DEBUG("msg_len=%"PRIu32" < msg->datalen=%"PRIu32"\n",
195 msg_len, msg->datalen);
196 return 0;
199 num_callbacks = talloc_array_length(conn->callbacks);
201 for (i=0; i<num_callbacks; i++) {
202 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
204 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
205 int ret;
207 ret = cb->cb(msg->hdr.srcnode, msg->hdr.destnode,
208 msg->srvid, msg->data, msg->datalen,
209 cb->private_data);
210 if (ret != 0) {
211 return ret;
215 return 0;
219 * get our vnn from the cluster
221 static int get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
223 int32_t cstatus=-1;
224 int ret;
225 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_PNN, 0, 0,
226 tdb_null, NULL, NULL, &cstatus);
227 if (ret != 0) {
228 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
229 return ret;
231 *vnn = (uint32_t)cstatus;
232 return ret;
236 * Are we active (i.e. not banned or stopped?)
238 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
240 int32_t cstatus=-1;
241 TDB_DATA outdata;
242 struct ctdb_node_map_old *m;
243 bool ok = false;
244 uint32_t i;
245 int ret;
247 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_NODEMAP, 0, 0,
248 tdb_null, talloc_tos(), &outdata, &cstatus);
249 if (ret != 0) {
250 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
251 return false;
253 if ((cstatus != 0) || (outdata.dptr == NULL)) {
254 DEBUG(2, ("Received invalid ctdb data\n"));
255 return false;
258 m = (struct ctdb_node_map_old *)outdata.dptr;
260 for (i=0; i<m->num; i++) {
261 if (vnn == m->nodes[i].pnn) {
262 break;
266 if (i == m->num) {
267 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
268 (int)vnn));
269 goto fail;
272 if ((m->nodes[i].flags & NODE_FLAGS_INACTIVE) != 0) {
273 DEBUG(2, ("Node has status %x, not active\n",
274 (int)m->nodes[i].flags));
275 goto fail;
278 ok = true;
279 fail:
280 TALLOC_FREE(outdata.dptr);
281 return ok;
284 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
286 return conn->our_vnn;
290 * Get us a ctdb connection
293 static int ctdbd_connect(const char *sockname, int *pfd)
295 struct sockaddr_un addr = { 0, };
296 int fd;
297 socklen_t salen;
298 size_t namelen;
300 fd = socket(AF_UNIX, SOCK_STREAM, 0);
301 if (fd == -1) {
302 int err = errno;
303 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
304 return err;
307 addr.sun_family = AF_UNIX;
309 namelen = strlcpy(addr.sun_path, sockname, sizeof(addr.sun_path));
310 if (namelen >= sizeof(addr.sun_path)) {
311 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
312 sockname));
313 close(fd);
314 return ENAMETOOLONG;
317 salen = sizeof(struct sockaddr_un);
319 if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) {
320 int err = errno;
321 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
322 strerror(err)));
323 close(fd);
324 return err;
327 *pfd = fd;
328 return 0;
331 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
332 struct ctdb_req_header **result)
334 struct ctdb_req_header *req;
335 uint32_t msglen;
336 ssize_t nread;
338 if (timeout != -1) {
339 struct pollfd pfd = { .fd = fd, .events = POLLIN };
340 int ret;
342 ret = sys_poll_intr(&pfd, 1, timeout);
343 if (ret == -1) {
344 return errno;
346 if (ret == 0) {
347 return ETIMEDOUT;
349 if (ret != 1) {
350 return EIO;
354 nread = read_data(fd, &msglen, sizeof(msglen));
355 if (nread == -1) {
356 return errno;
358 if (nread == 0) {
359 return EIO;
362 if (msglen < sizeof(struct ctdb_req_header)) {
363 return EIO;
366 req = talloc_size(mem_ctx, msglen);
367 if (req == NULL) {
368 return ENOMEM;
370 talloc_set_name_const(req, "struct ctdb_req_header");
372 req->length = msglen;
374 nread = read_data(fd, ((char *)req) + sizeof(msglen),
375 msglen - sizeof(msglen));
376 if (nread == -1) {
377 TALLOC_FREE(req);
378 return errno;
380 if (nread == 0) {
381 TALLOC_FREE(req);
382 return EIO;
385 *result = req;
386 return 0;
390 * Read a full ctdbd request. If we have a messaging context, defer incoming
391 * messages that might come in between.
394 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
395 TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
397 struct ctdb_req_header *hdr;
398 int ret;
400 next_pkt:
402 ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
403 if (ret != 0) {
404 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret)));
405 cluster_fatal("ctdbd died\n");
408 DEBUG(11, ("Received ctdb packet\n"));
409 ctdb_packet_dump(hdr);
411 if (hdr->operation == CTDB_REQ_MESSAGE) {
412 struct ctdb_req_message_old *msg = (struct ctdb_req_message_old *)hdr;
414 ret = ctdbd_msg_call_back(conn, msg);
415 if (ret != 0) {
416 TALLOC_FREE(hdr);
417 return ret;
420 TALLOC_FREE(hdr);
421 goto next_pkt;
424 if ((reqid != 0) && (hdr->reqid != reqid)) {
425 /* we got the wrong reply */
426 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
427 "been %u\n", hdr->reqid, reqid));
428 TALLOC_FREE(hdr);
429 goto next_pkt;
432 *result = talloc_move(mem_ctx, &hdr);
434 return 0;
438 * This prepares conn for handling async requests
440 int ctdbd_setup_fde(struct ctdbd_connection *conn, struct tevent_context *ev)
442 int ret;
444 ret = set_blocking(conn->fd, false);
445 if (ret == -1) {
446 return errno;
449 conn->fde = tevent_add_fd(ev,
450 conn,
451 conn->fd,
452 TEVENT_FD_READ,
453 ctdbd_async_socket_handler,
454 conn);
455 if (conn->fde == NULL) {
456 return ENOMEM;
459 return 0;
462 static int ctdbd_connection_destructor(struct ctdbd_connection *c);
465 * Get us a ctdbd connection
468 static int ctdbd_init_connection_internal(TALLOC_CTX *mem_ctx,
469 const char *sockname, int timeout,
470 struct ctdbd_connection *conn)
472 int ret;
474 conn->timeout = timeout;
475 if (conn->timeout == 0) {
476 conn->timeout = -1;
479 ret = ctdbd_connect(sockname, &conn->fd);
480 if (ret != 0) {
481 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
482 return ret;
484 talloc_set_destructor(conn, ctdbd_connection_destructor);
486 ret = get_cluster_vnn(conn, &conn->our_vnn);
487 if (ret != 0) {
488 DEBUG(10, ("get_cluster_vnn failed: %s\n", strerror(ret)));
489 return ret;
492 if (!ctdbd_working(conn, conn->our_vnn)) {
493 DEBUG(2, ("Node is not working, can not connect\n"));
494 return EIO;
497 generate_random_buffer((unsigned char *)&conn->rand_srvid,
498 sizeof(conn->rand_srvid));
500 ret = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
501 if (ret != 0) {
502 DEBUG(5, ("Could not register random srvid: %s\n",
503 strerror(ret)));
504 return ret;
507 return 0;
510 int ctdbd_init_connection(TALLOC_CTX *mem_ctx,
511 const char *sockname, int timeout,
512 struct ctdbd_connection **pconn)
514 struct ctdbd_connection *conn;
515 int ret;
517 if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
518 DEBUG(0, ("talloc failed\n"));
519 return ENOMEM;
522 ret = ctdbd_init_connection_internal(mem_ctx,
523 sockname,
524 timeout,
525 conn);
526 if (ret != 0) {
527 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
528 strerror(ret));
529 goto fail;
532 *pconn = conn;
533 return 0;
535 fail:
536 TALLOC_FREE(conn);
537 return ret;
540 int ctdbd_reinit_connection(TALLOC_CTX *mem_ctx,
541 const char *sockname, int timeout,
542 struct ctdbd_connection *conn)
544 int ret;
546 ret = ctdbd_connection_destructor(conn);
547 if (ret != 0) {
548 DBG_ERR("ctdbd_connection_destructor failed\n");
549 return ret;
552 ret = ctdbd_init_connection_internal(mem_ctx,
553 sockname,
554 timeout,
555 conn);
556 if (ret != 0) {
557 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
558 strerror(ret));
559 return ret;
562 return 0;
565 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
567 return conn->fd;
571 * Packet handler to receive and handle a ctdb message
573 static int ctdb_handle_message(struct ctdbd_connection *conn,
574 struct ctdb_req_header *hdr)
576 struct ctdb_req_message_old *msg;
578 if (hdr->operation != CTDB_REQ_MESSAGE) {
579 DEBUG(0, ("Received async msg of type %u, discarding\n",
580 hdr->operation));
581 return EINVAL;
584 msg = (struct ctdb_req_message_old *)hdr;
586 ctdbd_msg_call_back(conn, msg);
588 return 0;
591 void ctdbd_socket_readable(struct ctdbd_connection *conn)
593 struct ctdb_req_header *hdr = NULL;
594 int ret;
596 ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
597 if (ret != 0) {
598 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret)));
599 cluster_fatal("ctdbd died\n");
602 ret = ctdb_handle_message(conn, hdr);
604 TALLOC_FREE(hdr);
606 if (ret != 0) {
607 DEBUG(10, ("could not handle incoming message: %s\n",
608 strerror(ret)));
612 static int ctdb_pkt_send_handler(struct ctdbd_connection *conn);
613 static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn);
615 /* Used for async connection and async ctcb requests */
616 static void ctdbd_async_socket_handler(struct tevent_context *ev,
617 struct tevent_fd *fde,
618 uint16_t flags,
619 void *private_data)
621 struct ctdbd_connection *conn = talloc_get_type_abort(
622 private_data, struct ctdbd_connection);
623 int ret;
625 if ((flags & TEVENT_FD_READ) != 0) {
626 ret = ctdb_pkt_recv_handler(conn);
627 if (ret != 0) {
628 DBG_DEBUG("ctdb_read_iov_handler returned %s\n",
629 strerror(ret));
631 return;
634 if ((flags & TEVENT_FD_WRITE) != 0) {
635 ret = ctdb_pkt_send_handler(conn);
636 if (ret != 0) {
637 DBG_DEBUG("ctdb_write_iov_handler returned %s\n",
638 strerror(ret));
639 return;
641 return;
644 return;
647 int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
648 uint32_t dst_vnn, uint64_t dst_srvid,
649 const struct iovec *iov, int iovlen)
651 struct ctdb_req_message_old r;
652 struct iovec iov2[iovlen+1];
653 size_t buflen = iov_buflen(iov, iovlen);
654 ssize_t nwritten;
656 r.hdr.length = offsetof(struct ctdb_req_message_old, data) + buflen;
657 r.hdr.ctdb_magic = CTDB_MAGIC;
658 r.hdr.ctdb_version = CTDB_PROTOCOL;
659 r.hdr.generation = 1;
660 r.hdr.operation = CTDB_REQ_MESSAGE;
661 r.hdr.destnode = dst_vnn;
662 r.hdr.srcnode = conn->our_vnn;
663 r.hdr.reqid = 0;
664 r.srvid = dst_srvid;
665 r.datalen = buflen;
667 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
668 ctdb_packet_dump(&r.hdr);
670 iov2[0].iov_base = &r;
671 iov2[0].iov_len = offsetof(struct ctdb_req_message_old, data);
672 memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
674 nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
675 if (nwritten == -1) {
676 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
677 cluster_fatal("cluster dispatch daemon msg write error\n");
680 return 0;
684 * send/recv a generic ctdb control message
686 static int ctdbd_control(struct ctdbd_connection *conn,
687 uint32_t vnn, uint32_t opcode,
688 uint64_t srvid, uint32_t flags,
689 TDB_DATA data,
690 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
691 int32_t *cstatus)
693 struct ctdb_req_control_old req;
694 struct ctdb_req_header *hdr;
695 struct ctdb_reply_control_old *reply = NULL;
696 struct iovec iov[2];
697 ssize_t nwritten;
698 int ret;
700 if (ctdbd_conn_has_async_reqs(conn)) {
702 * Can't use sync call while an async call is in flight. Adding
703 * this check as a safety net. We'll be using different
704 * connections for sync and async requests, so this shouldn't
705 * happen, but who knows...
707 DBG_ERR("Async ctdb req on sync connection\n");
708 return EINVAL;
711 ZERO_STRUCT(req);
712 req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
713 req.hdr.ctdb_magic = CTDB_MAGIC;
714 req.hdr.ctdb_version = CTDB_PROTOCOL;
715 req.hdr.operation = CTDB_REQ_CONTROL;
716 req.hdr.reqid = ctdbd_next_reqid(conn);
717 req.hdr.destnode = vnn;
718 req.opcode = opcode;
719 req.srvid = srvid;
720 req.datalen = data.dsize;
721 req.flags = flags;
723 DBG_DEBUG("Sending ctdb packet reqid=%"PRIu32", vnn=%"PRIu32", "
724 "opcode=%"PRIu32", srvid=%"PRIu64"\n", req.hdr.reqid,
725 req.hdr.destnode, req.opcode, req.srvid);
726 ctdb_packet_dump(&req.hdr);
728 iov[0].iov_base = &req;
729 iov[0].iov_len = offsetof(struct ctdb_req_control_old, data);
730 iov[1].iov_base = data.dptr;
731 iov[1].iov_len = data.dsize;
733 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
734 if (nwritten == -1) {
735 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
736 cluster_fatal("cluster dispatch daemon msg write error\n");
739 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
740 if (cstatus) {
741 *cstatus = 0;
743 return 0;
746 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
747 if (ret != 0) {
748 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
749 return ret;
752 if (hdr->operation != CTDB_REPLY_CONTROL) {
753 DEBUG(0, ("received invalid reply\n"));
754 TALLOC_FREE(hdr);
755 return EIO;
757 reply = (struct ctdb_reply_control_old *)hdr;
759 if (outdata) {
760 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
761 mem_ctx, reply->data, reply->datalen))) {
762 TALLOC_FREE(reply);
763 return ENOMEM;
765 outdata->dsize = reply->datalen;
767 if (cstatus) {
768 (*cstatus) = reply->status;
771 TALLOC_FREE(reply);
772 return ret;
776 * see if a remote process exists
778 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn, pid_t pid)
780 int32_t cstatus = 0;
781 int ret;
783 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0, 0,
784 (TDB_DATA) { .dptr = (uint8_t *)&pid,
785 .dsize = sizeof(pid) },
786 NULL, NULL, &cstatus);
787 if (ret != 0) {
788 return false;
790 return (cstatus == 0);
794 * Get a db path
796 char *ctdbd_dbpath(struct ctdbd_connection *conn,
797 TALLOC_CTX *mem_ctx, uint32_t db_id)
799 int ret;
800 TDB_DATA data;
801 TDB_DATA rdata = {0};
802 int32_t cstatus = 0;
804 data.dptr = (uint8_t*)&db_id;
805 data.dsize = sizeof(db_id);
807 ret = ctdbd_control_local(conn, CTDB_CONTROL_GETDBPATH, 0, 0, data,
808 mem_ctx, &rdata, &cstatus);
809 if ((ret != 0) || cstatus != 0) {
810 DEBUG(0, (__location__ " ctdb_control for getdbpath failed: %s\n",
811 strerror(ret)));
812 return NULL;
815 return (char *)rdata.dptr;
819 * attach to a ctdb database
821 int ctdbd_db_attach(struct ctdbd_connection *conn,
822 const char *name, uint32_t *db_id, int tdb_flags)
824 int ret;
825 TDB_DATA data;
826 int32_t cstatus;
827 bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
829 data = string_term_tdb_data(name);
831 ret = ctdbd_control_local(conn,
832 persistent
833 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
834 : CTDB_CONTROL_DB_ATTACH,
835 0, 0, data, NULL, &data, &cstatus);
836 if (ret != 0) {
837 DEBUG(0, (__location__ " ctdb_control for db_attach "
838 "failed: %s\n", strerror(ret)));
839 return ret;
842 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
843 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
844 return EIO;
847 *db_id = *(uint32_t *)data.dptr;
848 talloc_free(data.dptr);
850 return 0;
854 * force the migration of a record to this node
856 int ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key)
858 struct ctdb_req_call_old req;
859 struct ctdb_req_header *hdr = NULL;
860 struct iovec iov[2];
861 ssize_t nwritten;
862 int ret;
864 if (ctdbd_conn_has_async_reqs(conn)) {
866 * Can't use sync call while an async call is in flight. Adding
867 * this check as a safety net. We'll be using different
868 * connections for sync and async requests, so this shouldn't
869 * happen, but who knows...
871 DBG_ERR("Async ctdb req on sync connection\n");
872 return EINVAL;
875 ZERO_STRUCT(req);
877 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
878 req.hdr.ctdb_magic = CTDB_MAGIC;
879 req.hdr.ctdb_version = CTDB_PROTOCOL;
880 req.hdr.operation = CTDB_REQ_CALL;
881 req.hdr.reqid = ctdbd_next_reqid(conn);
882 req.flags = CTDB_IMMEDIATE_MIGRATION;
883 req.callid = CTDB_NULL_FUNC;
884 req.db_id = db_id;
885 req.keylen = key.dsize;
887 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
888 ctdb_packet_dump(&req.hdr);
890 iov[0].iov_base = &req;
891 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
892 iov[1].iov_base = key.dptr;
893 iov[1].iov_len = key.dsize;
895 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
896 if (nwritten == -1) {
897 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
898 cluster_fatal("cluster dispatch daemon msg write error\n");
901 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
902 if (ret != 0) {
903 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
904 goto fail;
907 if (hdr->operation != CTDB_REPLY_CALL) {
908 if (hdr->operation == CTDB_REPLY_ERROR) {
909 DBG_ERR("received error from ctdb\n");
910 } else {
911 DBG_ERR("received invalid reply\n");
913 ret = EIO;
914 goto fail;
917 fail:
919 TALLOC_FREE(hdr);
920 return ret;
924 * Fetch a record and parse it
926 int ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
927 TDB_DATA key, bool local_copy,
928 void (*parser)(TDB_DATA key, TDB_DATA data,
929 void *private_data),
930 void *private_data)
932 struct ctdb_req_call_old req;
933 struct ctdb_req_header *hdr = NULL;
934 struct ctdb_reply_call_old *reply;
935 struct iovec iov[2];
936 ssize_t nwritten;
937 uint32_t flags;
938 int ret;
940 if (ctdbd_conn_has_async_reqs(conn)) {
942 * Can't use sync call while an async call is in flight. Adding
943 * this check as a safety net. We'll be using different
944 * connections for sync and async requests, so this shouldn't
945 * happen, but who knows...
947 DBG_ERR("Async ctdb req on sync connection\n");
948 return EINVAL;
951 flags = local_copy ? CTDB_WANT_READONLY : 0;
953 ZERO_STRUCT(req);
955 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
956 req.hdr.ctdb_magic = CTDB_MAGIC;
957 req.hdr.ctdb_version = CTDB_PROTOCOL;
958 req.hdr.operation = CTDB_REQ_CALL;
959 req.hdr.reqid = ctdbd_next_reqid(conn);
960 req.flags = flags;
961 req.callid = CTDB_FETCH_FUNC;
962 req.db_id = db_id;
963 req.keylen = key.dsize;
965 iov[0].iov_base = &req;
966 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
967 iov[1].iov_base = key.dptr;
968 iov[1].iov_len = key.dsize;
970 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
971 if (nwritten == -1) {
972 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
973 cluster_fatal("cluster dispatch daemon msg write error\n");
976 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
977 if (ret != 0) {
978 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
979 goto fail;
982 if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
983 DEBUG(0, ("received invalid reply\n"));
984 ret = EIO;
985 goto fail;
987 reply = (struct ctdb_reply_call_old *)hdr;
989 if (reply->datalen == 0) {
991 * Treat an empty record as non-existing
993 ret = ENOENT;
994 goto fail;
997 parser(key, make_tdb_data(&reply->data[0], reply->datalen),
998 private_data);
1000 ret = 0;
1001 fail:
1002 TALLOC_FREE(hdr);
1003 return ret;
1007 Traverse a ctdb database. "conn" must be an otherwise unused
1008 ctdb_connection where no other messages but the traverse ones are
1009 expected.
1012 int ctdbd_traverse(struct ctdbd_connection *conn, uint32_t db_id,
1013 void (*fn)(TDB_DATA key, TDB_DATA data,
1014 void *private_data),
1015 void *private_data)
1017 int ret;
1018 TDB_DATA key, data;
1019 struct ctdb_traverse_start t;
1020 int32_t cstatus;
1022 if (ctdbd_conn_has_async_reqs(conn)) {
1024 * Can't use sync call while an async call is in flight. Adding
1025 * this check as a safety net. We'll be using different
1026 * connections for sync and async requests, so this shouldn't
1027 * happen, but who knows...
1029 DBG_ERR("Async ctdb req on sync connection\n");
1030 return EINVAL;
1033 t.db_id = db_id;
1034 t.srvid = conn->rand_srvid;
1035 t.reqid = ctdbd_next_reqid(conn);
1037 data.dptr = (uint8_t *)&t;
1038 data.dsize = sizeof(t);
1040 ret = ctdbd_control_local(conn, CTDB_CONTROL_TRAVERSE_START,
1041 conn->rand_srvid,
1042 0, data, NULL, NULL, &cstatus);
1044 if ((ret != 0) || (cstatus != 0)) {
1045 DEBUG(0,("ctdbd_control failed: %s, %d\n", strerror(ret),
1046 cstatus));
1048 if (ret == 0) {
1050 * We need a mapping here
1052 ret = EIO;
1054 return ret;
1057 while (true) {
1058 struct ctdb_req_header *hdr = NULL;
1059 struct ctdb_req_message_old *m;
1060 struct ctdb_rec_data_old *d;
1062 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1063 if (ret != 0) {
1064 DEBUG(0, ("ctdb_read_packet failed: %s\n",
1065 strerror(ret)));
1066 cluster_fatal("ctdbd died\n");
1069 if (hdr->operation != CTDB_REQ_MESSAGE) {
1070 DEBUG(0, ("Got operation %u, expected a message\n",
1071 (unsigned)hdr->operation));
1072 return EIO;
1075 m = (struct ctdb_req_message_old *)hdr;
1076 d = (struct ctdb_rec_data_old *)&m->data[0];
1077 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1078 DEBUG(0, ("Got invalid traverse data of length %d\n",
1079 (int)m->datalen));
1080 return EIO;
1083 key.dsize = d->keylen;
1084 key.dptr = &d->data[0];
1085 data.dsize = d->datalen;
1086 data.dptr = &d->data[d->keylen];
1088 if (key.dsize == 0 && data.dsize == 0) {
1089 /* end of traverse */
1090 return 0;
1093 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1094 DEBUG(0, ("Got invalid ltdb header length %d\n",
1095 (int)data.dsize));
1096 return EIO;
1098 data.dsize -= sizeof(struct ctdb_ltdb_header);
1099 data.dptr += sizeof(struct ctdb_ltdb_header);
1101 if (fn != NULL) {
1102 fn(key, data, private_data);
1105 return 0;
1109 This is used to canonicalize a ctdb_sock_addr structure.
1111 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1112 struct sockaddr_storage *out)
1114 memcpy(out, in, sizeof (*out));
1116 #ifdef HAVE_IPV6
1117 if (in->ss_family == AF_INET6) {
1118 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1119 const struct sockaddr_in6 *in6 =
1120 (const struct sockaddr_in6 *)in;
1121 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1122 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1123 memset(out, 0, sizeof(*out));
1124 #ifdef HAVE_SOCK_SIN_LEN
1125 out4->sin_len = sizeof(*out);
1126 #endif
1127 out4->sin_family = AF_INET;
1128 out4->sin_port = in6->sin6_port;
1129 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1132 #endif
1136 * Register us as a server for a particular tcp connection
1139 int ctdbd_register_ips(struct ctdbd_connection *conn,
1140 const struct sockaddr_storage *_server,
1141 const struct sockaddr_storage *_client,
1142 int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
1143 uint64_t dst_srvid,
1144 const uint8_t *msg, size_t msglen,
1145 void *private_data),
1146 void *private_data)
1148 struct ctdb_connection p;
1149 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1150 int ret;
1151 struct sockaddr_storage client;
1152 struct sockaddr_storage server;
1155 * Only one connection so far
1158 smbd_ctdb_canonicalize_ip(_client, &client);
1159 smbd_ctdb_canonicalize_ip(_server, &server);
1161 switch (client.ss_family) {
1162 case AF_INET:
1163 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1164 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1165 break;
1166 case AF_INET6:
1167 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1168 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1169 break;
1170 default:
1171 return EIO;
1175 * We want to be told about IP releases
1178 ret = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1179 cb, private_data);
1180 if (ret != 0) {
1181 return ret;
1185 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1186 * can send an extra ack to trigger a reset for our client, so it
1187 * immediately reconnects
1189 ret = ctdbd_control(conn, CTDB_CURRENT_NODE,
1190 CTDB_CONTROL_TCP_CLIENT, 0,
1191 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1192 NULL);
1193 if (ret != 0) {
1194 return ret;
1196 return 0;
1200 call a control on the local node
1202 int ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1203 uint64_t srvid, uint32_t flags, TDB_DATA data,
1204 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1205 int32_t *cstatus)
1207 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data,
1208 mem_ctx, outdata, cstatus);
1211 int ctdb_watch_us(struct ctdbd_connection *conn)
1213 struct ctdb_notify_data_old reg_data;
1214 size_t struct_len;
1215 int ret;
1216 int32_t cstatus;
1218 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1219 reg_data.len = 1;
1220 reg_data.notify_data[0] = 0;
1222 struct_len = offsetof(struct ctdb_notify_data_old,
1223 notify_data) + reg_data.len;
1225 ret = ctdbd_control_local(
1226 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1227 make_tdb_data((uint8_t *)&reg_data, struct_len),
1228 NULL, NULL, &cstatus);
1229 if (ret != 0) {
1230 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1231 strerror(ret)));
1233 return ret;
1236 int ctdb_unwatch(struct ctdbd_connection *conn)
1238 uint64_t srvid = CTDB_SRVID_SAMBA_NOTIFY;
1239 int ret;
1240 int32_t cstatus;
1242 ret = ctdbd_control_local(
1243 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1244 make_tdb_data((uint8_t *)&srvid, sizeof(srvid)),
1245 NULL, NULL, &cstatus);
1246 if (ret != 0) {
1247 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1248 strerror(ret)));
1250 return ret;
1253 int ctdbd_probe(const char *sockname, int timeout)
1256 * Do a very early check if ctdbd is around to avoid an abort and core
1257 * later
1259 struct ctdbd_connection *conn = NULL;
1260 int ret;
1262 ret = ctdbd_init_connection(talloc_tos(), sockname, timeout,
1263 &conn);
1266 * We only care if we can connect.
1268 TALLOC_FREE(conn);
1270 return ret;
1273 struct ctdb_pkt_send_state {
1274 struct ctdb_pkt_send_state *prev, *next;
1275 struct tevent_context *ev;
1276 struct ctdbd_connection *conn;
1278 /* ctdb request id */
1279 uint32_t reqid;
1281 /* the associated tevent request */
1282 struct tevent_req *req;
1284 /* iovec array with data to send */
1285 struct iovec _iov;
1286 struct iovec *iov;
1287 int iovcnt;
1289 /* Initial packet length */
1290 size_t packet_len;
1293 static void ctdb_pkt_send_cleanup(struct tevent_req *req,
1294 enum tevent_req_state req_state);
1297 * Asynchronously send a ctdb packet given as iovec array
1299 * Note: the passed iov array is not const here. Similar
1300 * functions in samba take a const array and create a copy
1301 * before calling iov_advance() on the array.
1303 * This function will modify the iov array! But
1304 * this is a static function and our only caller
1305 * ctdb_parse_send/recv is preparared for this to
1306 * happen!
1308 static struct tevent_req *ctdb_pkt_send_send(TALLOC_CTX *mem_ctx,
1309 struct tevent_context *ev,
1310 struct ctdbd_connection *conn,
1311 uint32_t reqid,
1312 struct iovec *iov,
1313 int iovcnt,
1314 enum dbwrap_req_state *req_state)
1316 struct tevent_req *req = NULL;
1317 struct ctdb_pkt_send_state *state = NULL;
1318 ssize_t nwritten;
1319 bool ok;
1321 DBG_DEBUG("sending async ctdb reqid [%" PRIu32 "]\n", reqid);
1323 req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_send_state);
1324 if (req == NULL) {
1325 return NULL;
1328 *state = (struct ctdb_pkt_send_state) {
1329 .ev = ev,
1330 .conn = conn,
1331 .req = req,
1332 .reqid = reqid,
1333 .iov = iov,
1334 .iovcnt = iovcnt,
1335 .packet_len = iov_buflen(iov, iovcnt),
1338 tevent_req_set_cleanup_fn(req, ctdb_pkt_send_cleanup);
1340 *req_state = DBWRAP_REQ_QUEUED;
1342 if (ctdbd_conn_has_async_sends(conn)) {
1344 * Can't attempt direct write with messages already queued and
1345 * possibly in progress
1347 DLIST_ADD_END(conn->send_list, state);
1348 return req;
1352 * Attempt a direct write. If this returns short, shedule the
1353 * remaining data as an async write, otherwise we're already done.
1356 nwritten = writev(conn->fd, state->iov, state->iovcnt);
1357 if (nwritten == state->packet_len) {
1358 DBG_DEBUG("Finished sending reqid [%" PRIu32 "]\n", reqid);
1360 *req_state = DBWRAP_REQ_DISPATCHED;
1361 tevent_req_done(req);
1362 return tevent_req_post(req, ev);
1365 if (nwritten == -1) {
1366 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
1367 cluster_fatal("cluster write error\n");
1369 nwritten = 0;
1372 DBG_DEBUG("Posting async write of reqid [%" PRIu32"]"
1373 "after short write [%zd]\n", reqid, nwritten);
1375 ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
1376 if (!ok) {
1377 *req_state = DBWRAP_REQ_ERROR;
1378 tevent_req_error(req, EIO);
1379 return tevent_req_post(req, ev);
1383 * As this is the first async write req we post, we must enable
1384 * fd-writable events.
1386 TEVENT_FD_WRITEABLE(conn->fde);
1387 DLIST_ADD_END(conn->send_list, state);
1388 return req;
1391 static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state *state)
1393 struct ctdbd_connection *conn = state->conn;
1395 if (conn == NULL) {
1396 return 0;
1399 if (state->req == NULL) {
1400 DBG_DEBUG("Removing cancelled reqid [%" PRIu32"]\n",
1401 state->reqid);
1402 state->conn = NULL;
1403 DLIST_REMOVE(conn->send_list, state);
1404 return 0;
1407 DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32"]\n",
1408 state->reqid);
1410 talloc_reparent(state->req, conn, state);
1411 state->req = NULL;
1412 return -1;
1415 static void ctdb_pkt_send_cleanup(struct tevent_req *req,
1416 enum tevent_req_state req_state)
1418 struct ctdb_pkt_send_state *state = tevent_req_data(
1419 req, struct ctdb_pkt_send_state);
1420 struct ctdbd_connection *conn = state->conn;
1421 size_t missing_len = 0;
1423 if (conn == NULL) {
1424 return;
1427 missing_len = iov_buflen(state->iov, state->iovcnt);
1428 if (state->packet_len == missing_len) {
1430 * We haven't yet started sending this one, so we can just
1431 * remove it from the pending list
1433 missing_len = 0;
1435 if (missing_len != 0) {
1436 uint8_t *buf = NULL;
1438 if (req_state != TEVENT_REQ_RECEIVED) {
1440 * Wait til the req_state is TEVENT_REQ_RECEIVED, as
1441 * that will be the final state when the request state
1442 * is talloc_free'd from tallloc_req_received(). Which
1443 * ensures we only run the following code *ONCE*!
1445 return;
1448 DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32"]\n",
1449 state->reqid);
1451 * A request in progress of being sent. Reparent the iov buffer
1452 * so we can continue sending the request. See also the comment
1453 * in ctdbd_parse_send() when copying the key buffer.
1456 buf = iov_concat(state, state->iov, state->iovcnt);
1457 if (buf == NULL) {
1458 cluster_fatal("iov_concat error\n");
1459 return;
1462 state->iovcnt = 1;
1463 state->_iov.iov_base = buf;
1464 state->_iov.iov_len = missing_len;
1465 state->iov = &state->_iov;
1467 talloc_set_destructor(state, ctdb_pkt_send_state_destructor);
1468 return;
1471 DBG_DEBUG("Removing pending reqid [%" PRIu32"]\n", state->reqid);
1473 state->conn = NULL;
1474 DLIST_REMOVE(conn->send_list, state);
1476 if (!ctdbd_conn_has_async_sends(conn)) {
1477 DBG_DEBUG("No more sends, disabling fd-writable events\n");
1478 TEVENT_FD_NOT_WRITEABLE(conn->fde);
1482 static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
1484 struct ctdb_pkt_send_state *state = NULL;
1485 ssize_t nwritten;
1486 ssize_t iovlen;
1487 bool ok;
1489 DBG_DEBUG("send handler\n");
1491 if (!ctdbd_conn_has_async_sends(conn)) {
1492 DBG_WARNING("Writable fd-event without pending send\n");
1493 TEVENT_FD_NOT_WRITEABLE(conn->fde);
1494 return 0;
1497 state = conn->send_list;
1498 iovlen = iov_buflen(state->iov, state->iovcnt);
1500 nwritten = writev(conn->fd, state->iov, state->iovcnt);
1501 if (nwritten == -1) {
1502 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
1503 DBG_ERR("writev failed: %s\n", strerror(errno));
1504 cluster_fatal("cluster write error\n");
1506 DBG_DEBUG("recoverable writev error, retry\n");
1507 return 0;
1510 if (nwritten < iovlen) {
1511 DBG_DEBUG("short write\n");
1513 ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
1514 if (!ok) {
1515 DBG_ERR("iov_advance failed\n");
1516 if (state->req == NULL) {
1517 TALLOC_FREE(state);
1518 return 0;
1520 tevent_req_error(state->req, EIO);
1521 return 0;
1523 return 0;
1526 if (state->req == NULL) {
1527 DBG_DEBUG("Finished sending cancelled reqid [%" PRIu32 "]\n",
1528 state->reqid);
1529 TALLOC_FREE(state);
1530 return 0;
1533 DBG_DEBUG("Finished send request id [%" PRIu32 "]\n", state->reqid);
1535 tevent_req_done(state->req);
1536 return 0;
1539 static int ctdb_pkt_send_recv(struct tevent_req *req)
1541 int ret;
1543 if (tevent_req_is_unix_error(req, &ret)) {
1544 tevent_req_received(req);
1545 return ret;
1548 tevent_req_received(req);
1549 return 0;
1552 struct ctdb_pkt_recv_state {
1553 struct ctdb_pkt_recv_state *prev, *next;
1554 struct tevent_context *ev;
1555 struct ctdbd_connection *conn;
1557 /* ctdb request id */
1558 uint32_t reqid;
1560 /* the associated tevent_req */
1561 struct tevent_req *req;
1563 /* pointer to allocated ctdb packet buffer */
1564 struct ctdb_req_header *hdr;
1567 static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
1568 enum tevent_req_state req_state);
1570 static struct tevent_req *ctdb_pkt_recv_send(TALLOC_CTX *mem_ctx,
1571 struct tevent_context *ev,
1572 struct ctdbd_connection *conn,
1573 uint32_t reqid)
1575 struct tevent_req *req = NULL;
1576 struct ctdb_pkt_recv_state *state = NULL;
1578 req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_recv_state);
1579 if (req == NULL) {
1580 return NULL;
1583 *state = (struct ctdb_pkt_recv_state) {
1584 .ev = ev,
1585 .conn = conn,
1586 .reqid = reqid,
1587 .req = req,
1590 tevent_req_set_cleanup_fn(req, ctdb_pkt_recv_cleanup);
1593 * fd-readable event is always set for the fde, no need to deal with
1594 * that here.
1597 DLIST_ADD_END(conn->recv_list, state);
1598 DBG_DEBUG("Posted receive reqid [%" PRIu32 "]\n", state->reqid);
1600 return req;
1603 static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
1604 enum tevent_req_state req_state)
1606 struct ctdb_pkt_recv_state *state = tevent_req_data(
1607 req, struct ctdb_pkt_recv_state);
1608 struct ctdbd_connection *conn = state->conn;
1610 if (conn == NULL) {
1611 return;
1613 state->conn = NULL;
1614 DLIST_REMOVE(conn->recv_list, state);
1617 static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
1619 struct ctdb_pkt_recv_state *state = NULL;
1620 ssize_t nread;
1621 ssize_t iovlen;
1622 bool ok;
1624 DBG_DEBUG("receive handler\n");
1626 if (conn->read_state.iovs == NULL) {
1627 conn->read_state.iov.iov_base = &conn->read_state.msglen;
1628 conn->read_state.iov.iov_len = sizeof(conn->read_state.msglen);
1629 conn->read_state.iovs = &conn->read_state.iov;
1630 conn->read_state.iovcnt = 1;
1633 iovlen = iov_buflen(conn->read_state.iovs, conn->read_state.iovcnt);
1635 DBG_DEBUG("iovlen [%zd]\n", iovlen);
1637 nread = readv(conn->fd, conn->read_state.iovs, conn->read_state.iovcnt);
1638 if (nread == 0) {
1639 cluster_fatal("cluster read error, peer closed connection\n");
1641 if (nread == -1) {
1642 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
1643 cluster_fatal("cluster read error\n");
1645 DBG_DEBUG("recoverable error from readv, retry\n");
1646 return 0;
1649 if (nread < iovlen) {
1650 DBG_DEBUG("iovlen [%zd] nread [%zd]\n", iovlen, nread);
1651 ok = iov_advance(&conn->read_state.iovs,
1652 &conn->read_state.iovcnt,
1653 nread);
1654 if (!ok) {
1655 return EIO;
1657 return 0;
1660 conn->read_state.iovs = NULL;
1661 conn->read_state.iovcnt = 0;
1663 if (conn->read_state.hdr == NULL) {
1665 * Going this way after reading the 4 initial byte message
1666 * length
1668 uint32_t msglen = conn->read_state.msglen;
1669 uint8_t *readbuf = NULL;
1670 size_t readlen;
1672 DBG_DEBUG("msglen: %" PRIu32 "\n", msglen);
1674 if (msglen < sizeof(struct ctdb_req_header)) {
1675 DBG_ERR("short message %" PRIu32 "\n", msglen);
1676 return EIO;
1679 conn->read_state.hdr = talloc_size(conn, msglen);
1680 if (conn->read_state.hdr == NULL) {
1681 return ENOMEM;
1683 conn->read_state.hdr->length = msglen;
1684 talloc_set_name_const(conn->read_state.hdr,
1685 "struct ctdb_req_header");
1687 readbuf = (uint8_t *)conn->read_state.hdr + sizeof(msglen);
1688 readlen = msglen - sizeof(msglen);
1690 conn->read_state.iov.iov_base = readbuf;
1691 conn->read_state.iov.iov_len = readlen;
1692 conn->read_state.iovs = &conn->read_state.iov;
1693 conn->read_state.iovcnt = 1;
1695 DBG_DEBUG("Scheduled packet read size %zd\n", readlen);
1696 return 0;
1700 * Searching a list here is expected to be cheap, as messages are
1701 * exepcted to be coming in more or less ordered and we should find the
1702 * waiting request near the beginning of the list.
1704 for (state = conn->recv_list; state != NULL; state = state->next) {
1705 if (state->reqid == conn->read_state.hdr->reqid) {
1706 break;
1710 if (state == NULL) {
1711 DBG_ERR("Discarding async ctdb reqid %u\n",
1712 conn->read_state.hdr->reqid);
1713 TALLOC_FREE(conn->read_state.hdr);
1714 ZERO_STRUCT(conn->read_state);
1715 return EINVAL;
1718 DBG_DEBUG("Got reply for reqid [%" PRIu32 "]\n", state->reqid);
1720 state->hdr = talloc_move(state, &conn->read_state.hdr);
1721 ZERO_STRUCT(conn->read_state);
1722 tevent_req_done(state->req);
1723 return 0;
1726 static int ctdb_pkt_recv_recv(struct tevent_req *req,
1727 TALLOC_CTX *mem_ctx,
1728 struct ctdb_req_header **_hdr)
1730 struct ctdb_pkt_recv_state *state = tevent_req_data(
1731 req, struct ctdb_pkt_recv_state);
1732 int error;
1734 if (tevent_req_is_unix_error(req, &error)) {
1735 DBG_ERR("ctdb_read_req failed %s\n", strerror(error));
1736 tevent_req_received(req);
1737 return error;
1740 *_hdr = talloc_move(mem_ctx, &state->hdr);
1742 tevent_req_received(req);
1743 return 0;
1746 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
1748 TALLOC_FREE(c->fde);
1749 if (c->fd != -1) {
1750 close(c->fd);
1751 c->fd = -1;
1754 TALLOC_FREE(c->read_state.hdr);
1755 ZERO_STRUCT(c->read_state);
1757 while (c->send_list != NULL) {
1758 struct ctdb_pkt_send_state *send_state = c->send_list;
1759 DLIST_REMOVE(c->send_list, send_state);
1760 send_state->conn = NULL;
1761 tevent_req_defer_callback(send_state->req, send_state->ev);
1762 tevent_req_error(send_state->req, EIO);
1765 while (c->recv_list != NULL) {
1766 struct ctdb_pkt_recv_state *recv_state = c->recv_list;
1767 DLIST_REMOVE(c->recv_list, recv_state);
1768 recv_state->conn = NULL;
1769 tevent_req_defer_callback(recv_state->req, recv_state->ev);
1770 tevent_req_error(recv_state->req, EIO);
1773 return 0;
1776 struct ctdbd_parse_state {
1777 struct tevent_context *ev;
1778 struct ctdbd_connection *conn;
1779 uint32_t reqid;
1780 TDB_DATA key;
1781 uint8_t _keybuf[64];
1782 struct ctdb_req_call_old ctdb_req;
1783 struct iovec iov[2];
1784 void (*parser)(TDB_DATA key,
1785 TDB_DATA data,
1786 void *private_data);
1787 void *private_data;
1788 enum dbwrap_req_state *req_state;
1791 static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq);
1792 static void ctdbd_parse_done(struct tevent_req *subreq);
1794 struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
1795 struct tevent_context *ev,
1796 struct ctdbd_connection *conn,
1797 uint32_t db_id,
1798 TDB_DATA key,
1799 bool local_copy,
1800 void (*parser)(TDB_DATA key,
1801 TDB_DATA data,
1802 void *private_data),
1803 void *private_data,
1804 enum dbwrap_req_state *req_state)
1806 struct tevent_req *req = NULL;
1807 struct ctdbd_parse_state *state = NULL;
1808 uint32_t flags;
1809 uint32_t packet_length;
1810 struct tevent_req *subreq = NULL;
1812 req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
1813 if (req == NULL) {
1814 *req_state = DBWRAP_REQ_ERROR;
1815 return NULL;
1818 *state = (struct ctdbd_parse_state) {
1819 .ev = ev,
1820 .conn = conn,
1821 .reqid = ctdbd_next_reqid(conn),
1822 .parser = parser,
1823 .private_data = private_data,
1824 .req_state = req_state,
1827 flags = local_copy ? CTDB_WANT_READONLY : 0;
1828 packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
1831 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
1832 * all passed iov elements have a lifetime longer that the tevent_req
1833 * returned by ctdb_pkt_send_send(). This is required continue sending a
1834 * the low level request into the ctdb socket, if a higher level
1835 * ('this') request is canceled (or talloc free'd) by the application
1836 * layer, without sending invalid packets to ctdb.
1838 if (key.dsize > sizeof(state->_keybuf)) {
1839 state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
1840 if (tevent_req_nomem(state->key.dptr, req)) {
1841 return tevent_req_post(req, ev);
1843 } else {
1844 memcpy(state->_keybuf, key.dptr, key.dsize);
1845 state->key.dptr = state->_keybuf;
1847 state->key.dsize = key.dsize;
1849 state->ctdb_req.hdr.length = packet_length;
1850 state->ctdb_req.hdr.ctdb_magic = CTDB_MAGIC;
1851 state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
1852 state->ctdb_req.hdr.operation = CTDB_REQ_CALL;
1853 state->ctdb_req.hdr.reqid = state->reqid;
1854 state->ctdb_req.flags = flags;
1855 state->ctdb_req.callid = CTDB_FETCH_FUNC;
1856 state->ctdb_req.db_id = db_id;
1857 state->ctdb_req.keylen = state->key.dsize;
1859 state->iov[0].iov_base = &state->ctdb_req;
1860 state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
1861 state->iov[1].iov_base = state->key.dptr;
1862 state->iov[1].iov_len = state->key.dsize;
1865 * Note that ctdb_pkt_send_send()
1866 * will modify state->iov using
1867 * iov_advance() without making a copy.
1869 subreq = ctdb_pkt_send_send(state,
1871 conn,
1872 state->reqid,
1873 state->iov,
1874 ARRAY_SIZE(state->iov),
1875 req_state);
1876 if (tevent_req_nomem(subreq, req)) {
1877 *req_state = DBWRAP_REQ_ERROR;
1878 return tevent_req_post(req, ev);
1880 tevent_req_set_callback(subreq, ctdbd_parse_pkt_send_done, req);
1882 return req;
1885 static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq)
1887 struct tevent_req *req = tevent_req_callback_data(
1888 subreq, struct tevent_req);
1889 struct ctdbd_parse_state *state = tevent_req_data(
1890 req, struct ctdbd_parse_state);
1891 int ret;
1893 ret = ctdb_pkt_send_recv(subreq);
1894 TALLOC_FREE(subreq);
1895 if (tevent_req_error(req, ret)) {
1896 DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret));
1897 return;
1900 subreq = ctdb_pkt_recv_send(state,
1901 state->ev,
1902 state->conn,
1903 state->reqid);
1904 if (tevent_req_nomem(subreq, req)) {
1905 return;
1908 *state->req_state = DBWRAP_REQ_DISPATCHED;
1909 tevent_req_set_callback(subreq, ctdbd_parse_done, req);
1910 return;
1913 static void ctdbd_parse_done(struct tevent_req *subreq)
1915 struct tevent_req *req = tevent_req_callback_data(
1916 subreq, struct tevent_req);
1917 struct ctdbd_parse_state *state = tevent_req_data(
1918 req, struct ctdbd_parse_state);
1919 struct ctdb_req_header *hdr = NULL;
1920 struct ctdb_reply_call_old *reply = NULL;
1921 int ret;
1923 DBG_DEBUG("async parse request finished\n");
1925 ret = ctdb_pkt_recv_recv(subreq, state, &hdr);
1926 TALLOC_FREE(subreq);
1927 if (tevent_req_error(req, ret)) {
1928 DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret));
1929 return;
1932 if (hdr->operation != CTDB_REPLY_CALL) {
1933 DBG_ERR("received invalid reply\n");
1934 ctdb_packet_dump(hdr);
1935 tevent_req_error(req, EIO);
1936 return;
1939 reply = (struct ctdb_reply_call_old *)hdr;
1941 if (reply->datalen == 0) {
1943 * Treat an empty record as non-existing
1945 tevent_req_error(req, ENOENT);
1946 return;
1949 state->parser(state->key,
1950 make_tdb_data(&reply->data[0], reply->datalen),
1951 state->private_data);
1953 tevent_req_done(req);
1954 return;
1957 int ctdbd_parse_recv(struct tevent_req *req)
1959 int error;
1961 if (tevent_req_is_unix_error(req, &error)) {
1962 DBG_DEBUG("async parse returned %s\n", strerror(error));
1963 tevent_req_received(req);
1964 return error;
1967 tevent_req_received(req);
1968 return 0;