python: Fix erroneous increments of reference counts
[Samba.git] / source3 / lib / ctdbd_conn.c
bloba4a9f4e0caea0990360d03eb6a82615b09c55296
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "replace.h"
22 #include <tevent.h>
23 #include "util_tdb.h"
24 #include "serverid.h"
25 #include "ctdbd_conn.h"
26 #include "system/select.h"
27 #include "lib/util/util_net.h"
28 #include "lib/util/sys_rw_data.h"
29 #include "lib/util/iov_buf.h"
30 #include "lib/util/select.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/talloc_stack.h"
33 #include "lib/util/genrand.h"
34 #include "lib/util/fault.h"
35 #include "lib/util/dlinklist.h"
36 #include "lib/util/tevent_unix.h"
37 #include "lib/util/sys_rw.h"
38 #include "lib/util/blocking.h"
39 #include "ctdb/include/ctdb_protocol.h"
40 #include "lib/async_req/async_sock.h"
42 /* paths to these include files come from --with-ctdb= in configure */
44 struct ctdbd_srvid_cb {
45 uint64_t srvid;
46 int (*cb)(struct tevent_context *ev,
47 uint32_t src_vnn, uint32_t dst_vnn,
48 uint64_t dst_srvid,
49 const uint8_t *msg, size_t msglen,
50 void *private_data);
51 void *private_data;
54 struct ctdbd_connection {
55 uint32_t reqid;
56 uint32_t our_vnn;
57 uint64_t rand_srvid;
58 struct ctdbd_srvid_cb *callbacks;
59 int fd;
60 int timeout;
63 * Outgoing queue for writev_send of asynchronous ctdb requests
65 struct tevent_queue *outgoing;
66 struct tevent_req **pending;
67 struct tevent_req *read_req;
70 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
72 size_t len = talloc_array_length(conn->pending);
73 return (len != 0);
76 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
78 conn->reqid += 1;
79 if (conn->reqid == 0) {
80 conn->reqid += 1;
82 return conn->reqid;
85 static int ctdbd_control(struct ctdbd_connection *conn,
86 uint32_t vnn, uint32_t opcode,
87 uint64_t srvid, uint32_t flags,
88 TDB_DATA data,
89 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
90 int32_t *cstatus);
93 * exit on fatal communications errors with the ctdbd daemon
95 static void cluster_fatal(const char *why)
97 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
98 /* we don't use smb_panic() as we don't want to delay to write
99 a core file. We need to release this process id immediately
100 so that someone else can take over without getting sharing
101 violations */
102 _exit(1);
108 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
110 if (DEBUGLEVEL < 11) {
111 return;
113 DEBUGADD(11, ("len=%"PRIu32", magic=%"PRIu32", vers=%"PRIu32", "
114 "gen=%"PRIu32", op=%"PRIu32", reqid=%"PRIu32"\n",
115 hdr->length,
116 hdr->ctdb_magic,
117 hdr->ctdb_version,
118 hdr->generation,
119 hdr->operation,
120 hdr->reqid));
124 * Register a srvid with ctdbd
126 int register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
127 int (*cb)(struct tevent_context *ev,
128 uint32_t src_vnn, uint32_t dst_vnn,
129 uint64_t dst_srvid,
130 const uint8_t *msg, size_t msglen,
131 void *private_data),
132 void *private_data)
135 int ret;
136 int32_t cstatus;
137 size_t num_callbacks;
138 struct ctdbd_srvid_cb *tmp;
140 ret = ctdbd_control_local(conn, CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
141 tdb_null, NULL, NULL, &cstatus);
142 if (ret != 0) {
143 return ret;
146 num_callbacks = talloc_array_length(conn->callbacks);
148 tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
149 num_callbacks + 1);
150 if (tmp == NULL) {
151 return ENOMEM;
153 conn->callbacks = tmp;
155 conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
156 .srvid = srvid, .cb = cb, .private_data = private_data
159 return 0;
162 static int ctdbd_msg_call_back(struct tevent_context *ev,
163 struct ctdbd_connection *conn,
164 struct ctdb_req_message_old *msg)
166 uint32_t msg_len;
167 size_t i, num_callbacks;
169 msg_len = msg->hdr.length;
170 if (msg_len < offsetof(struct ctdb_req_message_old, data)) {
171 DBG_DEBUG("len %"PRIu32" too small\n", msg_len);
172 return 0;
174 msg_len -= offsetof(struct ctdb_req_message_old, data);
176 if (msg_len < msg->datalen) {
177 DBG_DEBUG("msg_len=%"PRIu32" < msg->datalen=%"PRIu32"\n",
178 msg_len, msg->datalen);
179 return 0;
182 num_callbacks = talloc_array_length(conn->callbacks);
184 for (i=0; i<num_callbacks; i++) {
185 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
187 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
188 int ret;
190 ret = cb->cb(ev,
191 msg->hdr.srcnode, msg->hdr.destnode,
192 msg->srvid, msg->data, msg->datalen,
193 cb->private_data);
194 if (ret != 0) {
195 return ret;
199 return 0;
203 * get our vnn from the cluster
205 static int get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
207 int32_t cstatus=-1;
208 int ret;
209 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_PNN, 0, 0,
210 tdb_null, NULL, NULL, &cstatus);
211 if (ret != 0) {
212 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
213 return ret;
215 *vnn = (uint32_t)cstatus;
216 return ret;
220 * Are we active (i.e. not banned or stopped?)
222 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
224 int32_t cstatus=-1;
225 TDB_DATA outdata = {0};
226 struct ctdb_node_map_old *m;
227 bool ok = false;
228 uint32_t i;
229 int ret;
231 ret = ctdbd_control_local(conn, CTDB_CONTROL_GET_NODEMAP, 0, 0,
232 tdb_null, talloc_tos(), &outdata, &cstatus);
233 if (ret != 0) {
234 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
235 return false;
237 if ((cstatus != 0) || (outdata.dptr == NULL)) {
238 DEBUG(2, ("Received invalid ctdb data\n"));
239 goto fail;
242 m = (struct ctdb_node_map_old *)outdata.dptr;
244 for (i=0; i<m->num; i++) {
245 if (vnn == m->nodes[i].pnn) {
246 break;
250 if (i == m->num) {
251 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
252 (int)vnn));
253 goto fail;
256 if ((m->nodes[i].flags & NODE_FLAGS_INACTIVE) != 0) {
257 DEBUG(2, ("Node has status %x, not active\n",
258 (int)m->nodes[i].flags));
259 goto fail;
262 ok = true;
263 fail:
264 TALLOC_FREE(outdata.dptr);
265 return ok;
268 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
270 return conn->our_vnn;
274 * Get us a ctdb connection
277 static int ctdbd_connect(const char *sockname, int *pfd)
279 struct samba_sockaddr addr = {
280 .sa_socklen = sizeof(struct sockaddr_un),
281 .u = {
282 .un = {
283 .sun_family = AF_UNIX,
287 int fd;
288 size_t namelen;
289 int ret;
291 fd = socket(AF_UNIX, SOCK_STREAM, 0);
292 if (fd == -1) {
293 int err = errno;
294 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
295 return err;
298 namelen = strlcpy(addr.u.un.sun_path,
299 sockname,
300 sizeof(addr.u.un.sun_path));
301 if (namelen >= sizeof(addr.u.un.sun_path)) {
302 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
303 sockname));
304 close(fd);
305 return ENAMETOOLONG;
308 ret = connect(fd, &addr.u.sa, addr.sa_socklen);
309 if (ret == -1) {
310 int err = errno;
311 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
312 strerror(err)));
313 close(fd);
314 return err;
317 *pfd = fd;
318 return 0;
321 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
322 struct ctdb_req_header **result)
324 struct ctdb_req_header *req;
325 uint32_t msglen;
326 ssize_t nread;
328 if (timeout != -1) {
329 struct pollfd pfd = { .fd = fd, .events = POLLIN };
330 int ret;
332 ret = sys_poll_intr(&pfd, 1, timeout);
333 if (ret == -1) {
334 return errno;
336 if (ret == 0) {
337 return ETIMEDOUT;
339 if (ret != 1) {
340 return EIO;
344 nread = read_data(fd, &msglen, sizeof(msglen));
345 if (nread == -1) {
346 return errno;
348 if (nread == 0) {
349 return EIO;
352 if (msglen < sizeof(struct ctdb_req_header)) {
353 return EIO;
356 req = talloc_size(mem_ctx, msglen);
357 if (req == NULL) {
358 return ENOMEM;
360 talloc_set_name_const(req, "struct ctdb_req_header");
362 req->length = msglen;
364 nread = read_data(fd, ((char *)req) + sizeof(msglen),
365 msglen - sizeof(msglen));
366 if (nread == -1) {
367 TALLOC_FREE(req);
368 return errno;
370 if (nread == 0) {
371 TALLOC_FREE(req);
372 return EIO;
375 *result = req;
376 return 0;
380 * Read a full ctdbd request. If we have a messaging context, defer incoming
381 * messages that might come in between.
384 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
385 TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
387 struct ctdb_req_header *hdr = NULL;
388 int ret;
390 next_pkt:
392 ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
393 if (ret != 0) {
394 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
395 cluster_fatal("failed to read data from ctdbd\n");
396 return -1;
398 SMB_ASSERT(hdr != NULL);
400 DEBUG(11, ("Received ctdb packet\n"));
401 ctdb_packet_dump(hdr);
403 if (hdr->operation == CTDB_REQ_MESSAGE) {
404 struct ctdb_req_message_old *msg = (struct ctdb_req_message_old *)hdr;
406 ret = ctdbd_msg_call_back(NULL, conn, msg);
407 if (ret != 0) {
408 TALLOC_FREE(hdr);
409 return ret;
412 TALLOC_FREE(hdr);
413 goto next_pkt;
416 if ((reqid != 0) && (hdr->reqid != reqid)) {
417 /* we got the wrong reply */
418 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
419 "been %u\n", hdr->reqid, reqid));
420 TALLOC_FREE(hdr);
421 goto next_pkt;
424 *result = talloc_move(mem_ctx, &hdr);
426 return 0;
429 static int ctdbd_connection_destructor(struct ctdbd_connection *c);
432 * Get us a ctdbd connection
435 static int ctdbd_init_connection_internal(TALLOC_CTX *mem_ctx,
436 const char *sockname, int timeout,
437 struct ctdbd_connection *conn)
439 int ret;
441 conn->timeout = timeout;
442 if (conn->timeout == 0) {
443 conn->timeout = -1;
446 ret = ctdbd_connect(sockname, &conn->fd);
447 if (ret != 0) {
448 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
449 return ret;
451 talloc_set_destructor(conn, ctdbd_connection_destructor);
453 ret = get_cluster_vnn(conn, &conn->our_vnn);
454 if (ret != 0) {
455 DEBUG(10, ("get_cluster_vnn failed: %s\n", strerror(ret)));
456 return ret;
459 if (!ctdbd_working(conn, conn->our_vnn)) {
460 DEBUG(2, ("Node is not working, can not connect\n"));
461 return EIO;
464 generate_random_buffer((unsigned char *)&conn->rand_srvid,
465 sizeof(conn->rand_srvid));
467 ret = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
468 if (ret != 0) {
469 DEBUG(5, ("Could not register random srvid: %s\n",
470 strerror(ret)));
471 return ret;
474 return 0;
477 int ctdbd_init_connection(TALLOC_CTX *mem_ctx,
478 const char *sockname, int timeout,
479 struct ctdbd_connection **pconn)
481 struct ctdbd_connection *conn;
482 int ret;
484 if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
485 DEBUG(0, ("talloc failed\n"));
486 return ENOMEM;
489 ret = ctdbd_init_connection_internal(mem_ctx,
490 sockname,
491 timeout,
492 conn);
493 if (ret != 0) {
494 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
495 strerror(ret));
496 goto fail;
499 *pconn = conn;
500 return 0;
502 fail:
503 TALLOC_FREE(conn);
504 return ret;
507 int ctdbd_reinit_connection(TALLOC_CTX *mem_ctx,
508 const char *sockname, int timeout,
509 struct ctdbd_connection *conn)
511 int ret;
513 ret = ctdbd_connection_destructor(conn);
514 if (ret != 0) {
515 DBG_ERR("ctdbd_connection_destructor failed\n");
516 return ret;
519 ret = ctdbd_init_connection_internal(mem_ctx,
520 sockname,
521 timeout,
522 conn);
523 if (ret != 0) {
524 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
525 strerror(ret));
526 return ret;
529 return 0;
532 int ctdbd_init_async_connection(
533 TALLOC_CTX *mem_ctx,
534 const char *sockname,
535 int timeout,
536 struct ctdbd_connection **pconn)
538 struct ctdbd_connection *conn = NULL;
539 int ret;
541 ret = ctdbd_init_connection(mem_ctx, sockname, timeout, &conn);
542 if (ret != 0) {
543 return ret;
546 ret = set_blocking(conn->fd, false);
547 if (ret == -1) {
548 int err = errno;
549 TALLOC_FREE(conn);
550 return err;
553 conn->outgoing = tevent_queue_create(conn, "ctdb async outgoing");
554 if (conn->outgoing == NULL) {
555 TALLOC_FREE(conn);
556 return ENOMEM;
559 *pconn = conn;
560 return 0;
563 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
565 return conn->fd;
569 * Packet handler to receive and handle a ctdb message
571 static int ctdb_handle_message(struct tevent_context *ev,
572 struct ctdbd_connection *conn,
573 struct ctdb_req_header *hdr)
575 struct ctdb_req_message_old *msg;
577 if (hdr->operation != CTDB_REQ_MESSAGE) {
578 DEBUG(0, ("Received async msg of type %u, discarding\n",
579 hdr->operation));
580 return EINVAL;
583 msg = (struct ctdb_req_message_old *)hdr;
585 ctdbd_msg_call_back(ev, conn, msg);
587 return 0;
590 void ctdbd_socket_readable(struct tevent_context *ev,
591 struct ctdbd_connection *conn)
593 struct ctdb_req_header *hdr = NULL;
594 int ret;
596 ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
597 if (ret != 0) {
598 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
599 cluster_fatal("failed to read data from ctdbd\n");
601 SMB_ASSERT(hdr != NULL);
603 ret = ctdb_handle_message(ev, conn, hdr);
605 TALLOC_FREE(hdr);
607 if (ret != 0) {
608 DEBUG(10, ("could not handle incoming message: %s\n",
609 strerror(ret)));
613 int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
614 uint32_t dst_vnn, uint64_t dst_srvid,
615 const struct iovec *iov, int iovlen)
617 struct ctdb_req_message_old r;
618 struct iovec iov2[iovlen+1];
619 size_t buflen = iov_buflen(iov, iovlen);
620 ssize_t nwritten;
622 r.hdr.length = offsetof(struct ctdb_req_message_old, data) + buflen;
623 r.hdr.ctdb_magic = CTDB_MAGIC;
624 r.hdr.ctdb_version = CTDB_PROTOCOL;
625 r.hdr.generation = 1;
626 r.hdr.operation = CTDB_REQ_MESSAGE;
627 r.hdr.destnode = dst_vnn;
628 r.hdr.srcnode = conn->our_vnn;
629 r.hdr.reqid = 0;
630 r.srvid = dst_srvid;
631 r.datalen = buflen;
633 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
634 ctdb_packet_dump(&r.hdr);
636 iov2[0].iov_base = &r;
637 iov2[0].iov_len = offsetof(struct ctdb_req_message_old, data);
638 memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
640 nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
641 if (nwritten == -1) {
642 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
643 cluster_fatal("cluster dispatch daemon msg write error\n");
646 return 0;
650 * send/recv a generic ctdb control message
652 static int ctdbd_control(struct ctdbd_connection *conn,
653 uint32_t vnn, uint32_t opcode,
654 uint64_t srvid, uint32_t flags,
655 TDB_DATA data,
656 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
657 int32_t *cstatus)
659 struct ctdb_req_control_old req;
660 struct ctdb_req_header *hdr;
661 struct ctdb_reply_control_old *reply = NULL;
662 struct iovec iov[2];
663 ssize_t nwritten;
664 int ret;
666 if (ctdbd_conn_has_async_reqs(conn)) {
668 * Can't use sync call while an async call is in flight. Adding
669 * this check as a safety net. We'll be using different
670 * connections for sync and async requests, so this shouldn't
671 * happen, but who knows...
673 DBG_ERR("Async ctdb req on sync connection\n");
674 return EINVAL;
677 ZERO_STRUCT(req);
678 req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
679 req.hdr.ctdb_magic = CTDB_MAGIC;
680 req.hdr.ctdb_version = CTDB_PROTOCOL;
681 req.hdr.operation = CTDB_REQ_CONTROL;
682 req.hdr.reqid = ctdbd_next_reqid(conn);
683 req.hdr.destnode = vnn;
684 req.opcode = opcode;
685 req.srvid = srvid;
686 req.datalen = data.dsize;
687 req.flags = flags;
689 DBG_DEBUG("Sending ctdb packet reqid=%"PRIu32", vnn=%"PRIu32", "
690 "opcode=%"PRIu32", srvid=%"PRIu64"\n", req.hdr.reqid,
691 req.hdr.destnode, req.opcode, req.srvid);
692 ctdb_packet_dump(&req.hdr);
694 iov[0].iov_base = &req;
695 iov[0].iov_len = offsetof(struct ctdb_req_control_old, data);
696 iov[1].iov_base = data.dptr;
697 iov[1].iov_len = data.dsize;
699 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
700 if (nwritten == -1) {
701 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
702 cluster_fatal("cluster dispatch daemon msg write error\n");
705 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
706 if (cstatus) {
707 *cstatus = 0;
709 return 0;
712 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
713 if (ret != 0) {
714 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
715 return ret;
718 if (hdr->operation != CTDB_REPLY_CONTROL) {
719 DEBUG(0, ("received invalid reply\n"));
720 TALLOC_FREE(hdr);
721 return EIO;
723 reply = (struct ctdb_reply_control_old *)hdr;
725 if (outdata) {
726 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
727 mem_ctx, reply->data, reply->datalen))) {
728 TALLOC_FREE(reply);
729 return ENOMEM;
731 outdata->dsize = reply->datalen;
733 if (cstatus) {
734 (*cstatus) = reply->status;
737 TALLOC_FREE(reply);
738 return ret;
742 * see if a remote process exists
744 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn,
745 pid_t pid, uint64_t unique_id)
747 uint8_t buf[sizeof(pid)+sizeof(unique_id)];
748 int32_t cstatus = 0;
749 int ret;
751 if (unique_id == SERVERID_UNIQUE_ID_NOT_TO_VERIFY) {
752 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS,
753 0, 0,
754 (TDB_DATA) { .dptr = (uint8_t *)&pid,
755 .dsize = sizeof(pid) },
756 NULL, NULL, &cstatus);
757 if (ret != 0) {
758 return false;
760 return (cstatus == 0);
763 memcpy(buf, &pid, sizeof(pid));
764 memcpy(buf+sizeof(pid), &unique_id, sizeof(unique_id));
766 ret = ctdbd_control(conn, vnn, CTDB_CONTROL_CHECK_PID_SRVID, 0, 0,
767 (TDB_DATA) { .dptr = buf, .dsize = sizeof(buf) },
768 NULL, NULL, &cstatus);
769 if (ret != 0) {
770 return false;
772 return (cstatus == 0);
776 * Get a db path
778 char *ctdbd_dbpath(struct ctdbd_connection *conn,
779 TALLOC_CTX *mem_ctx, uint32_t db_id)
781 int ret;
782 TDB_DATA data;
783 TDB_DATA rdata = {0};
784 int32_t cstatus = 0;
786 data.dptr = (uint8_t*)&db_id;
787 data.dsize = sizeof(db_id);
789 ret = ctdbd_control_local(conn, CTDB_CONTROL_GETDBPATH, 0, 0, data,
790 mem_ctx, &rdata, &cstatus);
791 if ((ret != 0) || cstatus != 0) {
792 DEBUG(0, (__location__ " ctdb_control for getdbpath failed: %s\n",
793 strerror(ret)));
794 TALLOC_FREE(rdata.dptr);
797 return (char *)rdata.dptr;
801 * attach to a ctdb database
803 int ctdbd_db_attach(struct ctdbd_connection *conn,
804 const char *name, uint32_t *db_id, bool persistent)
806 int ret;
807 TDB_DATA data = {0};
808 int32_t cstatus;
810 data = string_term_tdb_data(name);
812 ret = ctdbd_control_local(conn,
813 persistent
814 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
815 : CTDB_CONTROL_DB_ATTACH,
816 0, 0, data, NULL, &data, &cstatus);
817 if (ret != 0) {
818 DEBUG(0, (__location__ " ctdb_control for db_attach "
819 "failed: %s\n", strerror(ret)));
820 return ret;
823 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
824 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
825 TALLOC_FREE(data.dptr);
826 return EIO;
829 *db_id = *(uint32_t *)data.dptr;
830 talloc_free(data.dptr);
832 return 0;
836 * force the migration of a record to this node
838 int ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key)
840 struct ctdb_req_call_old req;
841 struct ctdb_req_header *hdr = NULL;
842 struct iovec iov[2];
843 ssize_t nwritten;
844 int ret;
846 if (ctdbd_conn_has_async_reqs(conn)) {
848 * Can't use sync call while an async call is in flight. Adding
849 * this check as a safety net. We'll be using different
850 * connections for sync and async requests, so this shouldn't
851 * happen, but who knows...
853 DBG_ERR("Async ctdb req on sync connection\n");
854 return EINVAL;
857 ZERO_STRUCT(req);
859 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
860 req.hdr.ctdb_magic = CTDB_MAGIC;
861 req.hdr.ctdb_version = CTDB_PROTOCOL;
862 req.hdr.operation = CTDB_REQ_CALL;
863 req.hdr.reqid = ctdbd_next_reqid(conn);
864 req.flags = CTDB_IMMEDIATE_MIGRATION;
865 req.callid = CTDB_NULL_FUNC;
866 req.db_id = db_id;
867 req.keylen = key.dsize;
869 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
870 ctdb_packet_dump(&req.hdr);
872 iov[0].iov_base = &req;
873 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
874 iov[1].iov_base = key.dptr;
875 iov[1].iov_len = key.dsize;
877 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
878 if (nwritten == -1) {
879 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
880 cluster_fatal("cluster dispatch daemon msg write error\n");
883 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
884 if (ret != 0) {
885 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
886 goto fail;
889 if (hdr->operation != CTDB_REPLY_CALL) {
890 if (hdr->operation == CTDB_REPLY_ERROR) {
891 DBG_ERR("received error from ctdb\n");
892 } else {
893 DBG_ERR("received invalid reply\n");
895 ret = EIO;
896 goto fail;
899 fail:
901 TALLOC_FREE(hdr);
902 return ret;
906 * Fetch a record and parse it
908 int ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
909 TDB_DATA key, bool local_copy,
910 void (*parser)(TDB_DATA key, TDB_DATA data,
911 void *private_data),
912 void *private_data)
914 struct ctdb_req_call_old req;
915 struct ctdb_req_header *hdr = NULL;
916 struct ctdb_reply_call_old *reply;
917 struct iovec iov[2];
918 ssize_t nwritten;
919 uint32_t flags;
920 int ret;
922 if (ctdbd_conn_has_async_reqs(conn)) {
924 * Can't use sync call while an async call is in flight. Adding
925 * this check as a safety net. We'll be using different
926 * connections for sync and async requests, so this shouldn't
927 * happen, but who knows...
929 DBG_ERR("Async ctdb req on sync connection\n");
930 return EINVAL;
933 flags = local_copy ? CTDB_WANT_READONLY : 0;
935 ZERO_STRUCT(req);
937 req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
938 req.hdr.ctdb_magic = CTDB_MAGIC;
939 req.hdr.ctdb_version = CTDB_PROTOCOL;
940 req.hdr.operation = CTDB_REQ_CALL;
941 req.hdr.reqid = ctdbd_next_reqid(conn);
942 req.flags = flags;
943 req.callid = CTDB_FETCH_FUNC;
944 req.db_id = db_id;
945 req.keylen = key.dsize;
947 iov[0].iov_base = &req;
948 iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
949 iov[1].iov_base = key.dptr;
950 iov[1].iov_len = key.dsize;
952 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
953 if (nwritten == -1) {
954 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
955 cluster_fatal("cluster dispatch daemon msg write error\n");
958 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
959 if (ret != 0) {
960 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
961 goto fail;
964 if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
965 DEBUG(0, ("received invalid reply\n"));
966 ret = EIO;
967 goto fail;
969 reply = (struct ctdb_reply_call_old *)hdr;
971 if (reply->datalen == 0) {
973 * Treat an empty record as non-existing
975 ret = ENOENT;
976 goto fail;
979 parser(key, make_tdb_data(&reply->data[0], reply->datalen),
980 private_data);
982 ret = 0;
983 fail:
984 TALLOC_FREE(hdr);
985 return ret;
989 Traverse a ctdb database. "conn" must be an otherwise unused
990 ctdb_connection where no other messages but the traverse ones are
991 expected.
994 int ctdbd_traverse(struct ctdbd_connection *conn, uint32_t db_id,
995 void (*fn)(TDB_DATA key, TDB_DATA data,
996 void *private_data),
997 void *private_data)
999 int ret;
1000 TDB_DATA key, data;
1001 struct ctdb_traverse_start t;
1002 int32_t cstatus = 0;
1004 if (ctdbd_conn_has_async_reqs(conn)) {
1006 * Can't use sync call while an async call is in flight. Adding
1007 * this check as a safety net. We'll be using different
1008 * connections for sync and async requests, so this shouldn't
1009 * happen, but who knows...
1011 DBG_ERR("Async ctdb req on sync connection\n");
1012 return EINVAL;
1015 t.db_id = db_id;
1016 t.srvid = conn->rand_srvid;
1017 t.reqid = ctdbd_next_reqid(conn);
1019 data.dptr = (uint8_t *)&t;
1020 data.dsize = sizeof(t);
1022 ret = ctdbd_control_local(conn, CTDB_CONTROL_TRAVERSE_START,
1023 conn->rand_srvid,
1024 0, data, NULL, NULL, &cstatus);
1026 if ((ret != 0) || (cstatus != 0)) {
1027 DEBUG(0,("ctdbd_control failed: %s, %d\n", strerror(ret),
1028 cstatus));
1030 if (ret == 0) {
1032 * We need a mapping here
1034 ret = EIO;
1036 return ret;
1039 while (true) {
1040 struct ctdb_req_header *hdr = NULL;
1041 struct ctdb_req_message_old *m;
1042 struct ctdb_rec_data_old *d;
1044 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1045 if (ret != 0) {
1046 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret));
1047 cluster_fatal("failed to read data from ctdbd\n");
1049 SMB_ASSERT(hdr != NULL);
1051 if (hdr->operation != CTDB_REQ_MESSAGE) {
1052 DEBUG(0, ("Got operation %u, expected a message\n",
1053 (unsigned)hdr->operation));
1054 return EIO;
1057 m = (struct ctdb_req_message_old *)hdr;
1058 d = (struct ctdb_rec_data_old *)&m->data[0];
1059 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1060 DEBUG(0, ("Got invalid traverse data of length %d\n",
1061 (int)m->datalen));
1062 return EIO;
1065 key.dsize = d->keylen;
1066 key.dptr = &d->data[0];
1067 data.dsize = d->datalen;
1068 data.dptr = &d->data[d->keylen];
1070 if (key.dsize == 0 && data.dsize == 0) {
1071 /* end of traverse */
1072 return 0;
1075 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1076 DEBUG(0, ("Got invalid ltdb header length %d\n",
1077 (int)data.dsize));
1078 return EIO;
1080 data.dsize -= sizeof(struct ctdb_ltdb_header);
1081 data.dptr += sizeof(struct ctdb_ltdb_header);
1083 if (fn != NULL) {
1084 fn(key, data, private_data);
1087 return 0;
1091 This is used to canonicalize a ctdb_sock_addr structure.
1093 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1094 struct sockaddr_storage *out)
1096 memcpy(out, in, sizeof (*out));
1098 #ifdef HAVE_IPV6
1099 if (in->ss_family == AF_INET6) {
1100 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1101 const struct sockaddr_in6 *in6 =
1102 (const struct sockaddr_in6 *)in;
1103 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1104 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1105 memset(out, 0, sizeof(*out));
1106 #ifdef HAVE_SOCK_SIN_LEN
1107 out4->sin_len = sizeof(*out);
1108 #endif
1109 out4->sin_family = AF_INET;
1110 out4->sin_port = in6->sin6_port;
1111 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1114 #endif
1118 * Register us as a server for a particular tcp connection
1121 int ctdbd_register_ips(struct ctdbd_connection *conn,
1122 const struct sockaddr_storage *_server,
1123 const struct sockaddr_storage *_client,
1124 int (*cb)(struct tevent_context *ev,
1125 uint32_t src_vnn, uint32_t dst_vnn,
1126 uint64_t dst_srvid,
1127 const uint8_t *msg, size_t msglen,
1128 void *private_data),
1129 void *private_data)
1131 struct ctdb_connection p;
1132 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1133 int ret;
1134 struct sockaddr_storage client;
1135 struct sockaddr_storage server;
1138 * Only one connection so far
1141 smbd_ctdb_canonicalize_ip(_client, &client);
1142 smbd_ctdb_canonicalize_ip(_server, &server);
1144 switch (client.ss_family) {
1145 case AF_INET:
1146 memcpy(&p.dst.ip, &server, sizeof(p.dst.ip));
1147 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1148 break;
1149 case AF_INET6:
1150 memcpy(&p.dst.ip6, &server, sizeof(p.dst.ip6));
1151 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1152 break;
1153 default:
1154 return EIO;
1158 * We want to be told about IP releases
1161 ret = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1162 cb, private_data);
1163 if (ret != 0) {
1164 return ret;
1168 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1169 * can send an extra ack to trigger a reset for our client, so it
1170 * immediately reconnects
1172 ret = ctdbd_control_local(conn,
1173 CTDB_CONTROL_TCP_CLIENT, 0,
1174 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL,
1175 NULL);
1176 if (ret != 0) {
1177 return ret;
1179 return 0;
1182 int ctdbd_control_get_public_ips(struct ctdbd_connection *conn,
1183 uint32_t flags,
1184 TALLOC_CTX *mem_ctx,
1185 struct ctdb_public_ip_list_old **_ips)
1187 struct ctdb_public_ip_list_old *ips = NULL;
1188 TDB_DATA outdata;
1189 int32_t cstatus = -1;
1190 size_t min_dsize;
1191 size_t max_ips;
1192 int ret;
1194 *_ips = NULL;
1196 ret = ctdbd_control_local(conn,
1197 CTDB_CONTROL_GET_PUBLIC_IPS,
1198 0, /* srvid */
1199 flags,
1200 tdb_null, /* indata */
1201 mem_ctx,
1202 &outdata,
1203 &cstatus);
1204 if (ret != 0 || cstatus != 0) {
1205 DBG_ERR("ctdb_control for getpublicips failed ret:%d cstatus:%d\n",
1206 ret, (int)cstatus);
1207 return -1;
1210 min_dsize = offsetof(struct ctdb_public_ip_list_old, ips);
1211 if (outdata.dsize < min_dsize) {
1212 DBG_ERR("outdata.dsize=%zu < min_dsize=%zu\n",
1213 outdata.dsize, min_dsize);
1214 return -1;
1216 max_ips = (outdata.dsize - min_dsize)/sizeof(struct ctdb_public_ip);
1217 ips = (struct ctdb_public_ip_list_old *)outdata.dptr;
1218 if ((size_t)ips->num > max_ips) {
1219 DBG_ERR("ips->num=%zu > max_ips=%zu\n",
1220 (size_t)ips->num, max_ips);
1221 return -1;
1224 *_ips = ips;
1225 return 0;
1228 bool ctdbd_find_in_public_ips(const struct ctdb_public_ip_list_old *ips,
1229 const struct sockaddr_storage *ip)
1231 uint32_t i;
1233 for (i=0; i < ips->num; i++) {
1234 struct samba_sockaddr tmp = {
1235 .u = {
1236 .ss = *ip,
1239 bool match;
1241 match = sockaddr_equal(&ips->ips[i].addr.sa,
1242 &tmp.u.sa);
1243 if (match) {
1244 return true;
1248 return false;
1252 call a control on the local node
1254 int ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1255 uint64_t srvid, uint32_t flags, TDB_DATA data,
1256 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1257 int32_t *cstatus)
1259 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data,
1260 mem_ctx, outdata, cstatus);
1263 int ctdb_watch_us(struct ctdbd_connection *conn)
1265 struct ctdb_notify_data_old reg_data;
1266 size_t struct_len;
1267 int ret;
1268 int32_t cstatus;
1270 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1271 reg_data.len = 1;
1272 reg_data.notify_data[0] = 0;
1274 struct_len = offsetof(struct ctdb_notify_data_old,
1275 notify_data) + reg_data.len;
1277 ret = ctdbd_control_local(
1278 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1279 make_tdb_data((uint8_t *)&reg_data, struct_len),
1280 NULL, NULL, &cstatus);
1281 if (ret != 0) {
1282 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1283 strerror(ret)));
1285 return ret;
1288 int ctdb_unwatch(struct ctdbd_connection *conn)
1290 uint64_t srvid = CTDB_SRVID_SAMBA_NOTIFY;
1291 int ret;
1292 int32_t cstatus;
1294 ret = ctdbd_control_local(
1295 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1296 make_tdb_data((uint8_t *)&srvid, sizeof(srvid)),
1297 NULL, NULL, &cstatus);
1298 if (ret != 0) {
1299 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1300 strerror(ret)));
1302 return ret;
1305 int ctdbd_probe(const char *sockname, int timeout)
1308 * Do a very early check if ctdbd is around to avoid an abort and core
1309 * later
1311 struct ctdbd_connection *conn = NULL;
1312 int ret;
1314 ret = ctdbd_init_connection(talloc_tos(), sockname, timeout,
1315 &conn);
1318 * We only care if we can connect.
1320 TALLOC_FREE(conn);
1322 return ret;
1325 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
1327 if (c->fd != -1) {
1328 close(c->fd);
1329 c->fd = -1;
1331 return 0;
1334 void ctdbd_prep_hdr_next_reqid(
1335 struct ctdbd_connection *conn, struct ctdb_req_header *hdr)
1337 *hdr = (struct ctdb_req_header) {
1338 .ctdb_magic = CTDB_MAGIC,
1339 .ctdb_version = CTDB_PROTOCOL,
1340 .reqid = ctdbd_next_reqid(conn),
1341 .destnode = CTDB_CURRENT_NODE,
1345 struct ctdbd_pkt_read_state {
1346 uint8_t *pkt;
1349 static ssize_t ctdbd_pkt_read_more(
1350 uint8_t *buf, size_t buflen, void *private_data);
1351 static void ctdbd_pkt_read_done(struct tevent_req *subreq);
1353 static struct tevent_req *ctdbd_pkt_read_send(
1354 TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd)
1356 struct tevent_req *req = NULL, *subreq = NULL;
1357 struct ctdbd_pkt_read_state *state = NULL;
1359 req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state);
1360 if (req == NULL) {
1361 return NULL;
1363 subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL);
1364 if (tevent_req_nomem(subreq, req)) {
1365 return tevent_req_post(req, ev);
1367 tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req);
1368 return req;
1371 static ssize_t ctdbd_pkt_read_more(
1372 uint8_t *buf, size_t buflen, void *private_data)
1374 uint32_t msglen;
1375 if (buflen < 4) {
1376 return -1;
1378 if (buflen > 4) {
1379 return 0; /* Been here, done */
1381 memcpy(&msglen, buf, 4);
1383 if (msglen < sizeof(struct ctdb_req_header)) {
1384 return -1;
1386 return msglen - sizeof(msglen);
1389 static void ctdbd_pkt_read_done(struct tevent_req *subreq)
1391 struct tevent_req *req = tevent_req_callback_data(
1392 subreq, struct tevent_req);
1393 struct ctdbd_pkt_read_state *state = tevent_req_data(
1394 req, struct ctdbd_pkt_read_state);
1395 ssize_t nread;
1396 int err;
1398 nread = read_packet_recv(subreq, state, &state->pkt, &err);
1399 TALLOC_FREE(subreq);
1400 if (nread == -1) {
1401 tevent_req_error(req, err);
1402 return;
1404 tevent_req_done(req);
1407 static int ctdbd_pkt_read_recv(
1408 struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt)
1410 struct ctdbd_pkt_read_state *state = tevent_req_data(
1411 req, struct ctdbd_pkt_read_state);
1412 int err;
1414 if (tevent_req_is_unix_error(req, &err)) {
1415 return err;
1417 *pkt = talloc_move(mem_ctx, &state->pkt);
1418 tevent_req_received(req);
1419 return 0;
1422 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn);
1423 static void ctdbd_conn_received(struct tevent_req *subreq);
1425 struct ctdbd_req_state {
1426 struct ctdbd_connection *conn;
1427 struct tevent_context *ev;
1428 uint32_t reqid;
1429 struct ctdb_req_header *reply;
1432 static void ctdbd_req_unset_pending(struct tevent_req *req)
1434 struct ctdbd_req_state *state = tevent_req_data(
1435 req, struct ctdbd_req_state);
1436 struct ctdbd_connection *conn = state->conn;
1437 size_t num_pending = talloc_array_length(conn->pending);
1438 size_t i, num_after;
1440 tevent_req_set_cleanup_fn(req, NULL);
1442 if (num_pending == 1) {
1444 * conn->read_req is a child of conn->pending
1446 TALLOC_FREE(conn->pending);
1447 conn->read_req = NULL;
1448 return;
1451 for (i=0; i<num_pending; i++) {
1452 if (req == conn->pending[i]) {
1453 break;
1456 if (i == num_pending) {
1458 * Something's seriously broken. Just returning here is the
1459 * right thing nevertheless, the point of this routine is to
1460 * remove ourselves from conn->pending.
1462 return;
1465 num_after = num_pending - i - 1;
1466 if (num_after > 0) {
1467 memmove(&conn->pending[i],
1468 &conn->pending[i] + 1,
1469 sizeof(*conn->pending) * num_after);
1471 conn->pending = talloc_realloc(
1472 NULL, conn->pending, struct tevent_req *, num_pending - 1);
1475 static void ctdbd_req_cleanup(
1476 struct tevent_req *req, enum tevent_req_state req_state)
1478 ctdbd_req_unset_pending(req);
1481 static bool ctdbd_req_set_pending(struct tevent_req *req)
1483 struct ctdbd_req_state *state = tevent_req_data(
1484 req, struct ctdbd_req_state);
1485 struct ctdbd_connection *conn = state->conn;
1486 struct tevent_req **pending = NULL;
1487 size_t num_pending = talloc_array_length(conn->pending);
1488 bool ok;
1490 pending = talloc_realloc(
1491 conn, conn->pending, struct tevent_req *, num_pending + 1);
1492 if (pending == NULL) {
1493 return false;
1495 pending[num_pending] = req;
1496 conn->pending = pending;
1498 tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup);
1500 ok = ctdbd_conn_receive_next(conn);
1501 if (!ok) {
1502 ctdbd_req_unset_pending(req);
1503 return false;
1506 return true;
1509 static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn)
1511 size_t num_pending = talloc_array_length(conn->pending);
1512 struct tevent_req *req = NULL;
1513 struct ctdbd_req_state *state = NULL;
1515 if (conn->read_req != NULL) {
1516 return true;
1518 if (num_pending == 0) {
1520 * done for now
1522 return true;
1525 req = conn->pending[0];
1526 state = tevent_req_data(req, struct ctdbd_req_state);
1528 conn->read_req = ctdbd_pkt_read_send(
1529 conn->pending, state->ev, conn->fd);
1530 if (conn->read_req == NULL) {
1531 return false;
1533 tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn);
1534 return true;
1537 static void ctdbd_conn_received(struct tevent_req *subreq)
1539 struct ctdbd_connection *conn = tevent_req_callback_data(
1540 subreq, struct ctdbd_connection);
1541 TALLOC_CTX *frame = talloc_stackframe();
1542 uint8_t *pkt = NULL;
1543 int ret;
1544 struct ctdb_req_header *hdr = NULL;
1545 uint32_t reqid;
1546 struct tevent_req *req = NULL;
1547 struct ctdbd_req_state *state = NULL;
1548 size_t i, num_pending;
1549 bool ok;
1551 SMB_ASSERT(subreq == conn->read_req);
1552 conn->read_req = NULL;
1554 ret = ctdbd_pkt_read_recv(subreq, frame, &pkt);
1555 TALLOC_FREE(subreq);
1556 if (ret != 0) {
1557 cluster_fatal("ctdbd_pkt_read failed\n");
1560 hdr = (struct ctdb_req_header *)pkt;
1561 reqid = hdr->reqid;
1562 num_pending = talloc_array_length(conn->pending);
1564 for (i=0; i<num_pending; i++) {
1565 req = conn->pending[i];
1566 state = tevent_req_data(req, struct ctdbd_req_state);
1567 if (state->reqid == reqid) {
1568 break;
1572 if (i == num_pending) {
1573 /* not found */
1574 TALLOC_FREE(frame);
1575 return;
1578 state->reply = talloc_move(state, &hdr);
1579 tevent_req_defer_callback(req, state->ev);
1580 tevent_req_done(req);
1582 TALLOC_FREE(frame);
1584 ok = ctdbd_conn_receive_next(conn);
1585 if (!ok) {
1586 cluster_fatal("ctdbd_conn_receive_next failed\n");
1590 static void ctdbd_req_written(struct tevent_req *subreq);
1592 struct tevent_req *ctdbd_req_send(
1593 TALLOC_CTX *mem_ctx,
1594 struct tevent_context *ev,
1595 struct ctdbd_connection *conn,
1596 struct iovec *iov,
1597 size_t num_iov)
1599 struct tevent_req *req = NULL, *subreq = NULL;
1600 struct ctdbd_req_state *state = NULL;
1601 struct ctdb_req_header *hdr = NULL;
1602 bool ok;
1604 req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state);
1605 if (req == NULL) {
1606 return NULL;
1608 state->conn = conn;
1609 state->ev = ev;
1611 if ((num_iov == 0) ||
1612 (iov[0].iov_len < sizeof(struct ctdb_req_header))) {
1613 tevent_req_error(req, EINVAL);
1614 return tevent_req_post(req, ev);
1616 hdr = iov[0].iov_base;
1617 state->reqid = hdr->reqid;
1619 ok = ctdbd_req_set_pending(req);
1620 if (!ok) {
1621 tevent_req_oom(req);
1622 return tevent_req_post(req, ev);
1625 subreq = writev_send(
1626 state, ev, conn->outgoing, conn->fd, false, iov, num_iov);
1627 if (tevent_req_nomem(subreq, req)) {
1628 return tevent_req_post(req, ev);
1630 tevent_req_set_callback(subreq, ctdbd_req_written, req);
1632 return req;
1635 static void ctdbd_req_written(struct tevent_req *subreq)
1637 struct tevent_req *req = tevent_req_callback_data(
1638 subreq, struct tevent_req);
1639 ssize_t nwritten;
1640 int err;
1642 nwritten = writev_recv(subreq, &err);
1643 TALLOC_FREE(subreq);
1644 if (nwritten == -1) {
1645 tevent_req_error(req, err);
1646 return;
1650 int ctdbd_req_recv(
1651 struct tevent_req *req,
1652 TALLOC_CTX *mem_ctx,
1653 struct ctdb_req_header **reply)
1655 struct ctdbd_req_state *state = tevent_req_data(
1656 req, struct ctdbd_req_state);
1657 int err;
1659 if (tevent_req_is_unix_error(req, &err)) {
1660 return err;
1662 *reply = talloc_move(mem_ctx, &state->reply);
1663 tevent_req_received(req);
1664 return 0;
1667 struct ctdbd_parse_state {
1668 struct tevent_context *ev;
1669 struct ctdbd_connection *conn;
1670 uint32_t reqid;
1671 TDB_DATA key;
1672 uint8_t _keybuf[64];
1673 struct ctdb_req_call_old ctdb_req;
1674 struct iovec iov[2];
1675 void (*parser)(TDB_DATA key,
1676 TDB_DATA data,
1677 void *private_data);
1678 void *private_data;
1681 static void ctdbd_parse_done(struct tevent_req *subreq);
1683 struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
1684 struct tevent_context *ev,
1685 struct ctdbd_connection *conn,
1686 uint32_t db_id,
1687 TDB_DATA key,
1688 bool local_copy,
1689 void (*parser)(TDB_DATA key,
1690 TDB_DATA data,
1691 void *private_data),
1692 void *private_data,
1693 enum dbwrap_req_state *req_state)
1695 struct tevent_req *req = NULL;
1696 struct ctdbd_parse_state *state = NULL;
1697 uint32_t flags;
1698 uint32_t packet_length;
1699 struct tevent_req *subreq = NULL;
1701 req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
1702 if (req == NULL) {
1703 *req_state = DBWRAP_REQ_ERROR;
1704 return NULL;
1707 *req_state = DBWRAP_REQ_DISPATCHED;
1709 *state = (struct ctdbd_parse_state) {
1710 .ev = ev,
1711 .conn = conn,
1712 .reqid = ctdbd_next_reqid(conn),
1713 .parser = parser,
1714 .private_data = private_data,
1717 flags = local_copy ? CTDB_WANT_READONLY : 0;
1718 packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
1721 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
1722 * all passed iov elements have a lifetime longer that the tevent_req
1723 * returned by ctdb_pkt_send_send(). This is required continue sending a
1724 * the low level request into the ctdb socket, if a higher level
1725 * ('this') request is canceled (or talloc free'd) by the application
1726 * layer, without sending invalid packets to ctdb.
1728 if (key.dsize > sizeof(state->_keybuf)) {
1729 state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
1730 if (tevent_req_nomem(state->key.dptr, req)) {
1731 return tevent_req_post(req, ev);
1733 } else {
1734 memcpy(state->_keybuf, key.dptr, key.dsize);
1735 state->key.dptr = state->_keybuf;
1737 state->key.dsize = key.dsize;
1739 state->ctdb_req.hdr.length = packet_length;
1740 state->ctdb_req.hdr.ctdb_magic = CTDB_MAGIC;
1741 state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
1742 state->ctdb_req.hdr.operation = CTDB_REQ_CALL;
1743 state->ctdb_req.hdr.reqid = state->reqid;
1744 state->ctdb_req.flags = flags;
1745 state->ctdb_req.callid = CTDB_FETCH_FUNC;
1746 state->ctdb_req.db_id = db_id;
1747 state->ctdb_req.keylen = state->key.dsize;
1749 state->iov[0].iov_base = &state->ctdb_req;
1750 state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
1751 state->iov[1].iov_base = state->key.dptr;
1752 state->iov[1].iov_len = state->key.dsize;
1754 subreq = ctdbd_req_send(
1755 state, ev, conn, state->iov, ARRAY_SIZE(state->iov));
1756 if (tevent_req_nomem(subreq, req)) {
1757 *req_state = DBWRAP_REQ_ERROR;
1758 return tevent_req_post(req, ev);
1760 tevent_req_set_callback(subreq, ctdbd_parse_done, req);
1762 return req;
1765 static void ctdbd_parse_done(struct tevent_req *subreq)
1767 struct tevent_req *req = tevent_req_callback_data(
1768 subreq, struct tevent_req);
1769 struct ctdbd_parse_state *state = tevent_req_data(
1770 req, struct ctdbd_parse_state);
1771 struct ctdb_req_header *hdr = NULL;
1772 struct ctdb_reply_call_old *reply = NULL;
1773 int ret;
1775 ret = ctdbd_req_recv(subreq, state, &hdr);
1776 TALLOC_FREE(subreq);
1777 if (tevent_req_error(req, ret)) {
1778 DBG_DEBUG("ctdb_req_recv failed %s\n", strerror(ret));
1779 return;
1781 SMB_ASSERT(hdr != NULL);
1783 if (hdr->operation != CTDB_REPLY_CALL) {
1784 DBG_ERR("received invalid reply\n");
1785 ctdb_packet_dump(hdr);
1786 tevent_req_error(req, EIO);
1787 return;
1790 reply = (struct ctdb_reply_call_old *)hdr;
1792 if (reply->datalen == 0) {
1794 * Treat an empty record as non-existing
1796 tevent_req_error(req, ENOENT);
1797 return;
1800 state->parser(state->key,
1801 make_tdb_data(&reply->data[0], reply->datalen),
1802 state->private_data);
1804 tevent_req_done(req);
1805 return;
1808 int ctdbd_parse_recv(struct tevent_req *req)
1810 return tevent_req_simple_recv_unix(req);