lib: Store ctdb_timeout in ctdb_connection
[Samba.git] / source3 / lib / ctdbd_conn.c
blob07d1282dc31c16961e8ab713bceb20fbe522251f
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "includes.h"
22 #include "util_tdb.h"
23 #include "serverid.h"
24 #include "ctdbd_conn.h"
25 #include "system/select.h"
26 #include "lib/sys_rw_data.h"
27 #include "lib/util/iov_buf.h"
29 #include "messages.h"
31 /* paths to these include files come from --with-ctdb= in configure */
33 #include "ctdb.h"
34 #include "ctdb_private.h"
36 struct ctdbd_srvid_cb {
37 uint64_t srvid;
38 int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
39 uint64_t dst_srvid,
40 const uint8_t *msg, size_t msglen,
41 void *private_data);
42 void *private_data;
45 struct ctdbd_connection {
46 struct messaging_context *msg_ctx;
47 uint32_t reqid;
48 uint32_t our_vnn;
49 uint64_t rand_srvid;
50 struct ctdbd_srvid_cb *callbacks;
51 int fd;
52 struct tevent_fd *fde;
53 int timeout;
56 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
58 conn->reqid += 1;
59 if (conn->reqid == 0) {
60 conn->reqid += 1;
62 return conn->reqid;
65 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
66 uint32_t vnn, uint32_t opcode,
67 uint64_t srvid, uint32_t flags, TDB_DATA data,
68 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
69 int *cstatus);
72 * exit on fatal communications errors with the ctdbd daemon
74 static void cluster_fatal(const char *why)
76 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
77 /* we don't use smb_panic() as we don't want to delay to write
78 a core file. We need to release this process id immediately
79 so that someone else can take over without getting sharing
80 violations */
81 _exit(1);
87 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
89 if (DEBUGLEVEL < 11) {
90 return;
92 DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
93 (int)hdr->length, (int)hdr->ctdb_magic,
94 (int)hdr->ctdb_version, (int)hdr->generation,
95 (int)hdr->operation, (int)hdr->reqid));
99 * Register a srvid with ctdbd
101 NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
102 int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
103 uint64_t dst_srvid,
104 const uint8_t *msg, size_t msglen,
105 void *private_data),
106 void *private_data)
109 NTSTATUS status;
110 int cstatus;
111 size_t num_callbacks;
112 struct ctdbd_srvid_cb *tmp;
114 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
115 CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
116 tdb_null, NULL, NULL, &cstatus);
117 if (!NT_STATUS_IS_OK(status)) {
118 return status;
121 num_callbacks = talloc_array_length(conn->callbacks);
123 tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
124 num_callbacks + 1);
125 if (tmp == NULL) {
126 return NT_STATUS_NO_MEMORY;
128 conn->callbacks = tmp;
130 conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
131 .srvid = srvid, .cb = cb, .private_data = private_data
134 return NT_STATUS_OK;
137 static int ctdbd_msg_call_back(struct ctdbd_connection *conn,
138 struct ctdb_req_message *msg)
140 size_t msg_len;
141 size_t i, num_callbacks;
143 msg_len = msg->hdr.length;
144 if (msg_len < offsetof(struct ctdb_req_message, data)) {
145 DEBUG(10, ("%s: len %u too small\n", __func__,
146 (unsigned)msg_len));
147 return 0;
149 msg_len -= offsetof(struct ctdb_req_message, data);
151 if (msg_len < msg->datalen) {
152 DEBUG(10, ("%s: msg_len=%u < msg->datalen=%u\n", __func__,
153 (unsigned)msg_len, (unsigned)msg->datalen));
154 return 0;
157 num_callbacks = talloc_array_length(conn->callbacks);
159 for (i=0; i<num_callbacks; i++) {
160 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
162 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
163 int ret;
165 ret = cb->cb(msg->hdr.srcnode, msg->hdr.destnode,
166 msg->srvid, msg->data, msg->datalen,
167 cb->private_data);
168 if (ret != 0) {
169 return ret;
173 return 0;
177 * get our vnn from the cluster
179 static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
181 int32_t cstatus=-1;
182 NTSTATUS status;
183 status = ctdbd_control(conn,
184 CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
185 tdb_null, NULL, NULL, &cstatus);
186 if (!NT_STATUS_IS_OK(status)) {
187 DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
188 return status;
190 *vnn = (uint32_t)cstatus;
191 return status;
195 * Are we active (i.e. not banned or stopped?)
197 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
199 int32_t cstatus=-1;
200 NTSTATUS status;
201 TDB_DATA outdata;
202 struct ctdb_node_map *m;
203 uint32_t failure_flags;
204 bool ret = false;
205 int i;
207 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
208 CTDB_CONTROL_GET_NODEMAP, 0, 0,
209 tdb_null, talloc_tos(), &outdata, &cstatus);
210 if (!NT_STATUS_IS_OK(status)) {
211 DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
212 return false;
214 if ((cstatus != 0) || (outdata.dptr == NULL)) {
215 DEBUG(2, ("Received invalid ctdb data\n"));
216 return false;
219 m = (struct ctdb_node_map *)outdata.dptr;
221 for (i=0; i<m->num; i++) {
222 if (vnn == m->nodes[i].pnn) {
223 break;
227 if (i == m->num) {
228 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
229 (int)vnn));
230 goto fail;
233 failure_flags = NODE_FLAGS_BANNED | NODE_FLAGS_DISCONNECTED
234 | NODE_FLAGS_PERMANENTLY_DISABLED | NODE_FLAGS_STOPPED;
236 if ((m->nodes[i].flags & failure_flags) != 0) {
237 DEBUG(2, ("Node has status %x, not active\n",
238 (int)m->nodes[i].flags));
239 goto fail;
242 ret = true;
243 fail:
244 TALLOC_FREE(outdata.dptr);
245 return ret;
248 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
250 return conn->our_vnn;
253 const char *lp_ctdbd_socket(void)
255 const char *ret;
257 ret = lp__ctdbd_socket();
258 if (ret != NULL && strlen(ret) > 0) {
259 return ret;
262 return CTDB_SOCKET;
266 * Get us a ctdb connection
269 static int ctdbd_connect(const char *sockname, int *pfd)
271 struct sockaddr_un addr = { 0, };
272 int fd;
273 socklen_t salen;
274 size_t namelen;
276 fd = socket(AF_UNIX, SOCK_STREAM, 0);
277 if (fd == -1) {
278 int err = errno;
279 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
280 return err;
283 addr.sun_family = AF_UNIX;
285 namelen = strlcpy(addr.sun_path, sockname, sizeof(addr.sun_path));
286 if (namelen >= sizeof(addr.sun_path)) {
287 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
288 sockname));
289 close(fd);
290 return ENAMETOOLONG;
293 salen = sizeof(struct sockaddr_un);
295 if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) {
296 int err = errno;
297 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
298 strerror(err)));
299 close(fd);
300 return err;
303 *pfd = fd;
304 return 0;
307 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
308 struct ctdb_req_header **result)
310 struct ctdb_req_header *req;
311 int ret, revents;
312 uint32_t msglen;
313 ssize_t nread;
315 if (timeout != -1) {
316 ret = poll_one_fd(fd, POLLIN, timeout, &revents);
317 if (ret == -1) {
318 return errno;
320 if (ret == 0) {
321 return ETIMEDOUT;
323 if (ret != 1) {
324 return EIO;
328 nread = read_data(fd, &msglen, sizeof(msglen));
329 if (nread == -1) {
330 return errno;
332 if (nread == 0) {
333 return EIO;
336 if (msglen < sizeof(struct ctdb_req_header)) {
337 return EIO;
340 req = talloc_size(mem_ctx, msglen);
341 if (req == NULL) {
342 return ENOMEM;
344 talloc_set_name_const(req, "struct ctdb_req_header");
346 req->length = msglen;
348 nread = read_data(fd, ((char *)req) + sizeof(msglen),
349 msglen - sizeof(msglen));
350 if (nread == -1) {
351 return errno;
353 if (nread == 0) {
354 return EIO;
357 *result = req;
358 return 0;
362 * Read a full ctdbd request. If we have a messaging context, defer incoming
363 * messages that might come in between.
366 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
367 TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
369 struct ctdb_req_header *hdr;
370 int ret;
372 next_pkt:
374 ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
375 if (ret != 0) {
376 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret)));
377 cluster_fatal("ctdbd died\n");
380 DEBUG(11, ("Received ctdb packet\n"));
381 ctdb_packet_dump(hdr);
383 if (hdr->operation == CTDB_REQ_MESSAGE) {
384 struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
386 if (conn->msg_ctx == NULL) {
387 DEBUG(1, ("Got a message without having a msg ctx, "
388 "dropping msg %llu\n",
389 (long long unsigned)msg->srvid));
390 TALLOC_FREE(hdr);
391 goto next_pkt;
394 ret = ctdbd_msg_call_back(conn, msg);
395 if (ret != 0) {
396 TALLOC_FREE(hdr);
397 return ret;
400 TALLOC_FREE(hdr);
401 goto next_pkt;
404 if ((reqid != 0) && (hdr->reqid != reqid)) {
405 /* we got the wrong reply */
406 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
407 "been %u\n", hdr->reqid, reqid));
408 TALLOC_FREE(hdr);
409 goto next_pkt;
412 *result = talloc_move(mem_ctx, &hdr);
414 return 0;
417 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
419 close(c->fd);
420 return 0;
423 * Get us a ctdbd connection
426 static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
427 struct ctdbd_connection **pconn)
429 const char *sockname = lp_ctdbd_socket();
430 struct ctdbd_connection *conn;
431 int ret;
432 NTSTATUS status;
434 if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
435 DEBUG(0, ("talloc failed\n"));
436 return NT_STATUS_NO_MEMORY;
439 conn->timeout = lp_ctdb_timeout();
441 if (conn->timeout == 0) {
442 conn->timeout = -1;
445 ret = ctdbd_connect(sockname, &conn->fd);
446 if (ret != 0) {
447 status = map_nt_error_from_unix(ret);
448 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
449 goto fail;
451 talloc_set_destructor(conn, ctdbd_connection_destructor);
453 status = get_cluster_vnn(conn, &conn->our_vnn);
455 if (!NT_STATUS_IS_OK(status)) {
456 DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status)));
457 goto fail;
460 if (!ctdbd_working(conn, conn->our_vnn)) {
461 DEBUG(2, ("Node is not working, can not connect\n"));
462 status = NT_STATUS_INTERNAL_DB_ERROR;
463 goto fail;
466 generate_random_buffer((unsigned char *)&conn->rand_srvid,
467 sizeof(conn->rand_srvid));
469 status = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
471 if (!NT_STATUS_IS_OK(status)) {
472 DEBUG(5, ("Could not register random srvid: %s\n",
473 nt_errstr(status)));
474 goto fail;
477 *pconn = conn;
478 return NT_STATUS_OK;
480 fail:
481 TALLOC_FREE(conn);
482 return status;
486 * Get us a ctdbd connection and register us as a process
489 NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
490 struct ctdbd_connection **pconn)
492 struct ctdbd_connection *conn;
493 NTSTATUS status;
495 status = ctdbd_init_connection(mem_ctx, &conn);
497 if (!NT_STATUS_IS_OK(status)) {
498 return status;
501 status = register_with_ctdbd(conn, MSG_SRVID_SAMBA, NULL, NULL);
502 if (!NT_STATUS_IS_OK(status)) {
503 goto fail;
506 *pconn = conn;
507 return NT_STATUS_OK;
509 fail:
510 TALLOC_FREE(conn);
511 return status;
514 struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn)
516 return conn->msg_ctx;
519 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
521 return conn->fd;
525 * Packet handler to receive and handle a ctdb message
527 static int ctdb_handle_message(struct ctdbd_connection *conn,
528 struct ctdb_req_header *hdr)
530 struct ctdb_req_message *msg;
532 if (hdr->operation != CTDB_REQ_MESSAGE) {
533 DEBUG(0, ("Received async msg of type %u, discarding\n",
534 hdr->operation));
535 return EINVAL;
538 msg = (struct ctdb_req_message *)hdr;
540 ctdbd_msg_call_back(conn, msg);
542 return 0;
546 * The ctdbd socket is readable asynchronuously
549 static void ctdbd_socket_handler(struct tevent_context *event_ctx,
550 struct tevent_fd *event,
551 uint16_t flags,
552 void *private_data)
554 struct ctdbd_connection *conn = talloc_get_type_abort(
555 private_data, struct ctdbd_connection);
556 struct ctdb_req_header *hdr = NULL;
557 int ret;
559 ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
560 if (ret != 0) {
561 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret)));
562 cluster_fatal("ctdbd died\n");
565 ret = ctdb_handle_message(conn, hdr);
567 TALLOC_FREE(hdr);
569 if (ret != 0) {
570 DEBUG(10, ("could not handle incoming message: %s\n",
571 strerror(ret)));
576 * Prepare a ctdbd connection to receive messages
579 NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn,
580 struct messaging_context *msg_ctx)
582 SMB_ASSERT(conn->msg_ctx == NULL);
583 SMB_ASSERT(conn->fde == NULL);
585 if (!(conn->fde = tevent_add_fd(messaging_tevent_context(msg_ctx),
586 conn,
587 conn->fd,
588 TEVENT_FD_READ,
589 ctdbd_socket_handler,
590 conn))) {
591 DEBUG(0, ("event_add_fd failed\n"));
592 return NT_STATUS_NO_MEMORY;
595 conn->msg_ctx = msg_ctx;
597 return NT_STATUS_OK;
600 NTSTATUS ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
601 uint32_t dst_vnn, uint64_t dst_srvid,
602 const struct iovec *iov, int iovlen)
604 struct ctdb_req_message r;
605 struct iovec iov2[iovlen+1];
606 size_t buflen = iov_buflen(iov, iovlen);
607 ssize_t nwritten;
609 r.hdr.length = offsetof(struct ctdb_req_message, data) + buflen;
610 r.hdr.ctdb_magic = CTDB_MAGIC;
611 r.hdr.ctdb_version = CTDB_PROTOCOL;
612 r.hdr.generation = 1;
613 r.hdr.operation = CTDB_REQ_MESSAGE;
614 r.hdr.destnode = dst_vnn;
615 r.hdr.srcnode = conn->our_vnn;
616 r.hdr.reqid = 0;
617 r.srvid = dst_srvid;
618 r.datalen = buflen;
620 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
621 ctdb_packet_dump(&r.hdr);
623 iov2[0].iov_base = &r;
624 iov2[0].iov_len = offsetof(struct ctdb_req_message, data);
625 memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
627 nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
628 if (nwritten == -1) {
629 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
630 cluster_fatal("cluster dispatch daemon msg write error\n");
633 return NT_STATUS_OK;
637 * send/recv a generic ctdb control message
639 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
640 uint32_t vnn, uint32_t opcode,
641 uint64_t srvid, uint32_t flags,
642 TDB_DATA data,
643 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
644 int *cstatus)
646 struct ctdb_req_control req;
647 struct ctdb_req_header *hdr;
648 struct ctdb_reply_control *reply = NULL;
649 struct ctdbd_connection *new_conn = NULL;
650 struct iovec iov[2];
651 ssize_t nwritten;
652 NTSTATUS status;
653 int ret;
655 if (conn == NULL) {
656 status = ctdbd_init_connection(NULL, &new_conn);
658 if (!NT_STATUS_IS_OK(status)) {
659 DEBUG(10, ("Could not init temp connection: %s\n",
660 nt_errstr(status)));
661 goto fail;
664 conn = new_conn;
667 ZERO_STRUCT(req);
668 req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize;
669 req.hdr.ctdb_magic = CTDB_MAGIC;
670 req.hdr.ctdb_version = CTDB_PROTOCOL;
671 req.hdr.operation = CTDB_REQ_CONTROL;
672 req.hdr.reqid = ctdbd_next_reqid(conn);
673 req.hdr.destnode = vnn;
674 req.opcode = opcode;
675 req.srvid = srvid;
676 req.datalen = data.dsize;
677 req.flags = flags;
679 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
680 ctdb_packet_dump(&req.hdr);
682 iov[0].iov_base = &req;
683 iov[0].iov_len = offsetof(struct ctdb_req_control, data);
684 iov[1].iov_base = data.dptr;
685 iov[1].iov_len = data.dsize;
687 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
688 if (nwritten == -1) {
689 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
690 cluster_fatal("cluster dispatch daemon msg write error\n");
693 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
694 TALLOC_FREE(new_conn);
695 if (cstatus) {
696 *cstatus = 0;
698 return NT_STATUS_OK;
701 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
702 if (ret != 0) {
703 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
704 status = map_nt_error_from_unix(ret);
705 goto fail;
708 if (hdr->operation != CTDB_REPLY_CONTROL) {
709 DEBUG(0, ("received invalid reply\n"));
710 goto fail;
712 reply = (struct ctdb_reply_control *)hdr;
714 if (outdata) {
715 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
716 mem_ctx, reply->data, reply->datalen))) {
717 TALLOC_FREE(reply);
718 return NT_STATUS_NO_MEMORY;
720 outdata->dsize = reply->datalen;
722 if (cstatus) {
723 (*cstatus) = reply->status;
726 status = NT_STATUS_OK;
728 fail:
729 TALLOC_FREE(new_conn);
730 TALLOC_FREE(reply);
731 return status;
735 * see if a remote process exists
737 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn, pid_t pid)
739 struct server_id id;
740 bool result;
742 id.pid = pid;
743 id.vnn = vnn;
745 if (!ctdb_processes_exist(conn, &id, 1, &result)) {
746 DEBUG(10, ("ctdb_processes_exist failed\n"));
747 return false;
749 return result;
752 bool ctdb_processes_exist(struct ctdbd_connection *conn,
753 const struct server_id *pids, int num_pids,
754 bool *results)
756 TALLOC_CTX *frame = talloc_stackframe();
757 int i, num_received;
758 uint32_t *reqids;
759 bool result = false;
761 reqids = talloc_array(talloc_tos(), uint32_t, num_pids);
762 if (reqids == NULL) {
763 goto fail;
766 for (i=0; i<num_pids; i++) {
767 struct ctdb_req_control req;
768 pid_t pid;
769 struct iovec iov[2];
770 ssize_t nwritten;
772 results[i] = false;
773 reqids[i] = ctdbd_next_reqid(conn);
775 ZERO_STRUCT(req);
778 * pids[i].pid is uint64_t, scale down to pid_t which
779 * is the wire protocol towards ctdb.
781 pid = pids[i].pid;
783 DEBUG(10, ("Requesting PID %d/%d, reqid=%d\n",
784 (int)pids[i].vnn, (int)pid,
785 (int)reqids[i]));
787 req.hdr.length = offsetof(struct ctdb_req_control, data);
788 req.hdr.length += sizeof(pid);
789 req.hdr.ctdb_magic = CTDB_MAGIC;
790 req.hdr.ctdb_version = CTDB_PROTOCOL;
791 req.hdr.operation = CTDB_REQ_CONTROL;
792 req.hdr.reqid = reqids[i];
793 req.hdr.destnode = pids[i].vnn;
794 req.opcode = CTDB_CONTROL_PROCESS_EXISTS;
795 req.srvid = 0;
796 req.datalen = sizeof(pid);
797 req.flags = 0;
799 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
800 ctdb_packet_dump(&req.hdr);
802 iov[0].iov_base = &req;
803 iov[0].iov_len = offsetof(struct ctdb_req_control, data);
804 iov[1].iov_base = &pid;
805 iov[1].iov_len = sizeof(pid);
807 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
808 if (nwritten == -1) {
809 DEBUG(10, ("write_data_iov failed: %s\n",
810 strerror(errno)));
811 goto fail;
815 num_received = 0;
817 while (num_received < num_pids) {
818 struct ctdb_req_header *hdr;
819 struct ctdb_reply_control *reply;
820 uint32_t reqid;
821 int ret;
823 ret = ctdb_read_req(conn, 0, talloc_tos(), &hdr);
824 if (ret != 0) {
825 DEBUG(10, ("ctdb_read_req failed: %s\n",
826 strerror(ret)));
827 goto fail;
830 if (hdr->operation != CTDB_REPLY_CONTROL) {
831 DEBUG(10, ("Received invalid reply\n"));
832 goto fail;
834 reply = (struct ctdb_reply_control *)hdr;
836 reqid = reply->hdr.reqid;
838 DEBUG(10, ("Received reqid %d\n", (int)reqid));
840 for (i=0; i<num_pids; i++) {
841 if (reqid == reqids[i]) {
842 break;
845 if (i == num_pids) {
846 DEBUG(10, ("Received unknown record number %u\n",
847 (unsigned)reqid));
848 goto fail;
850 results[i] = ((reply->status) == 0);
851 TALLOC_FREE(reply);
852 num_received += 1;
855 result = true;
856 fail:
857 TALLOC_FREE(frame);
858 return result;
862 * Get a db path
864 char *ctdbd_dbpath(struct ctdbd_connection *conn,
865 TALLOC_CTX *mem_ctx, uint32_t db_id)
867 NTSTATUS status;
868 TDB_DATA data;
869 TDB_DATA rdata = {0};
870 int32_t cstatus = 0;
872 data.dptr = (uint8_t*)&db_id;
873 data.dsize = sizeof(db_id);
875 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
876 CTDB_CONTROL_GETDBPATH, 0, 0, data,
877 mem_ctx, &rdata, &cstatus);
878 if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
879 DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n"));
880 return NULL;
883 return (char *)rdata.dptr;
887 * attach to a ctdb database
889 NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
890 const char *name, uint32_t *db_id, int tdb_flags)
892 NTSTATUS status;
893 TDB_DATA data;
894 int32_t cstatus;
895 bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
897 data = string_term_tdb_data(name);
899 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
900 persistent
901 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
902 : CTDB_CONTROL_DB_ATTACH,
903 tdb_flags, 0, data, NULL, &data, &cstatus);
904 if (!NT_STATUS_IS_OK(status)) {
905 DEBUG(0, (__location__ " ctdb_control for db_attach "
906 "failed: %s\n", nt_errstr(status)));
907 return status;
910 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
911 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
912 return NT_STATUS_INTERNAL_ERROR;
915 *db_id = *(uint32_t *)data.dptr;
916 talloc_free(data.dptr);
918 if (!(tdb_flags & TDB_SEQNUM)) {
919 return NT_STATUS_OK;
922 data.dptr = (uint8_t *)db_id;
923 data.dsize = sizeof(*db_id);
925 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
926 CTDB_CONTROL_ENABLE_SEQNUM, 0, 0, data,
927 NULL, NULL, &cstatus);
928 if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
929 DEBUG(0,(__location__ " ctdb_control for enable seqnum "
930 "failed\n"));
931 return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR :
932 status;
935 return NT_STATUS_OK;
939 * force the migration of a record to this node
941 NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
942 TDB_DATA key)
944 struct ctdb_req_call req;
945 struct ctdb_req_header *hdr;
946 struct iovec iov[2];
947 ssize_t nwritten;
948 NTSTATUS status;
949 int ret;
951 ZERO_STRUCT(req);
953 req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
954 req.hdr.ctdb_magic = CTDB_MAGIC;
955 req.hdr.ctdb_version = CTDB_PROTOCOL;
956 req.hdr.operation = CTDB_REQ_CALL;
957 req.hdr.reqid = ctdbd_next_reqid(conn);
958 req.flags = CTDB_IMMEDIATE_MIGRATION;
959 req.callid = CTDB_NULL_FUNC;
960 req.db_id = db_id;
961 req.keylen = key.dsize;
963 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
964 ctdb_packet_dump(&req.hdr);
966 iov[0].iov_base = &req;
967 iov[0].iov_len = offsetof(struct ctdb_req_call, data);
968 iov[1].iov_base = key.dptr;
969 iov[1].iov_len = key.dsize;
971 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
972 if (nwritten == -1) {
973 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
974 cluster_fatal("cluster dispatch daemon msg write error\n");
977 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
978 if (ret != 0) {
979 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
980 status = map_nt_error_from_unix(ret);
981 goto fail;
984 if (hdr->operation != CTDB_REPLY_CALL) {
985 DEBUG(0, ("received invalid reply\n"));
986 status = NT_STATUS_INTERNAL_ERROR;
987 goto fail;
990 status = NT_STATUS_OK;
991 fail:
993 TALLOC_FREE(hdr);
994 return status;
998 * Fetch a record and parse it
1000 NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
1001 TDB_DATA key, bool local_copy,
1002 void (*parser)(TDB_DATA key, TDB_DATA data,
1003 void *private_data),
1004 void *private_data)
1006 struct ctdb_req_call req;
1007 struct ctdb_req_header *hdr = NULL;
1008 struct ctdb_reply_call *reply;
1009 struct iovec iov[2];
1010 ssize_t nwritten;
1011 NTSTATUS status;
1012 uint32_t flags;
1013 int ret;
1015 flags = local_copy ? CTDB_WANT_READONLY : 0;
1017 ZERO_STRUCT(req);
1019 req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1020 req.hdr.ctdb_magic = CTDB_MAGIC;
1021 req.hdr.ctdb_version = CTDB_PROTOCOL;
1022 req.hdr.operation = CTDB_REQ_CALL;
1023 req.hdr.reqid = ctdbd_next_reqid(conn);
1024 req.flags = flags;
1025 req.callid = CTDB_FETCH_FUNC;
1026 req.db_id = db_id;
1027 req.keylen = key.dsize;
1029 iov[0].iov_base = &req;
1030 iov[0].iov_len = offsetof(struct ctdb_req_call, data);
1031 iov[1].iov_base = key.dptr;
1032 iov[1].iov_len = key.dsize;
1034 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1035 if (nwritten == -1) {
1036 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
1037 cluster_fatal("cluster dispatch daemon msg write error\n");
1040 ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
1041 if (ret != 0) {
1042 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
1043 status = map_nt_error_from_unix(ret);
1044 goto fail;
1047 if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
1048 DEBUG(0, ("received invalid reply\n"));
1049 status = NT_STATUS_INTERNAL_ERROR;
1050 goto fail;
1052 reply = (struct ctdb_reply_call *)hdr;
1054 if (reply->datalen == 0) {
1056 * Treat an empty record as non-existing
1058 status = NT_STATUS_NOT_FOUND;
1059 goto fail;
1062 parser(key, make_tdb_data(&reply->data[0], reply->datalen),
1063 private_data);
1065 status = NT_STATUS_OK;
1066 fail:
1067 TALLOC_FREE(hdr);
1068 return status;
1072 Traverse a ctdb database. This uses a kind-of hackish way to open a second
1073 connection to ctdbd to avoid the hairy recursive and async problems with
1074 everything in-line.
1077 NTSTATUS ctdbd_traverse(uint32_t db_id,
1078 void (*fn)(TDB_DATA key, TDB_DATA data,
1079 void *private_data),
1080 void *private_data)
1082 struct ctdbd_connection *conn;
1083 NTSTATUS status;
1085 TDB_DATA key, data;
1086 struct ctdb_traverse_start t;
1087 int cstatus;
1089 become_root();
1090 status = ctdbd_init_connection(NULL, &conn);
1091 unbecome_root();
1092 if (!NT_STATUS_IS_OK(status)) {
1093 DEBUG(0, ("ctdbd_init_connection failed: %s\n",
1094 nt_errstr(status)));
1095 return status;
1098 t.db_id = db_id;
1099 t.srvid = conn->rand_srvid;
1100 t.reqid = ctdbd_next_reqid(conn);
1102 data.dptr = (uint8_t *)&t;
1103 data.dsize = sizeof(t);
1105 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1106 CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, 0,
1107 data, NULL, NULL, &cstatus);
1109 if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1111 DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status),
1112 cstatus));
1114 if (NT_STATUS_IS_OK(status)) {
1116 * We need a mapping here
1118 status = NT_STATUS_UNSUCCESSFUL;
1120 TALLOC_FREE(conn);
1121 return status;
1124 while (True) {
1125 struct ctdb_req_header *hdr = NULL;
1126 struct ctdb_req_message *m;
1127 struct ctdb_rec_data *d;
1128 int ret;
1130 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1131 if (ret != 0) {
1132 DEBUG(0, ("ctdb_read_packet failed: %s\n",
1133 strerror(ret)));
1134 cluster_fatal("ctdbd died\n");
1137 if (hdr->operation != CTDB_REQ_MESSAGE) {
1138 DEBUG(0, ("Got operation %u, expected a message\n",
1139 (unsigned)hdr->operation));
1140 TALLOC_FREE(conn);
1141 return NT_STATUS_UNEXPECTED_IO_ERROR;
1144 m = (struct ctdb_req_message *)hdr;
1145 d = (struct ctdb_rec_data *)&m->data[0];
1146 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1147 DEBUG(0, ("Got invalid traverse data of length %d\n",
1148 (int)m->datalen));
1149 TALLOC_FREE(conn);
1150 return NT_STATUS_UNEXPECTED_IO_ERROR;
1153 key.dsize = d->keylen;
1154 key.dptr = &d->data[0];
1155 data.dsize = d->datalen;
1156 data.dptr = &d->data[d->keylen];
1158 if (key.dsize == 0 && data.dsize == 0) {
1159 /* end of traverse */
1160 TALLOC_FREE(conn);
1161 return NT_STATUS_OK;
1164 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1165 DEBUG(0, ("Got invalid ltdb header length %d\n",
1166 (int)data.dsize));
1167 TALLOC_FREE(conn);
1168 return NT_STATUS_UNEXPECTED_IO_ERROR;
1170 data.dsize -= sizeof(struct ctdb_ltdb_header);
1171 data.dptr += sizeof(struct ctdb_ltdb_header);
1173 if (fn != NULL) {
1174 fn(key, data, private_data);
1177 return NT_STATUS_OK;
1181 This is used to canonicalize a ctdb_sock_addr structure.
1183 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1184 struct sockaddr_storage *out)
1186 memcpy(out, in, sizeof (*out));
1188 #ifdef HAVE_IPV6
1189 if (in->ss_family == AF_INET6) {
1190 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1191 const struct sockaddr_in6 *in6 =
1192 (const struct sockaddr_in6 *)in;
1193 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1194 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1195 memset(out, 0, sizeof(*out));
1196 #ifdef HAVE_SOCK_SIN_LEN
1197 out4->sin_len = sizeof(*out);
1198 #endif
1199 out4->sin_family = AF_INET;
1200 out4->sin_port = in6->sin6_port;
1201 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1204 #endif
1208 * Register us as a server for a particular tcp connection
1211 NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn,
1212 const struct sockaddr_storage *_server,
1213 const struct sockaddr_storage *_client,
1214 int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
1215 uint64_t dst_srvid,
1216 const uint8_t *msg, size_t msglen,
1217 void *private_data),
1218 void *private_data)
1220 struct ctdb_control_tcp_addr p;
1221 TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1222 NTSTATUS status;
1223 struct sockaddr_storage client;
1224 struct sockaddr_storage server;
1227 * Only one connection so far
1230 smbd_ctdb_canonicalize_ip(_client, &client);
1231 smbd_ctdb_canonicalize_ip(_server, &server);
1233 switch (client.ss_family) {
1234 case AF_INET:
1235 memcpy(&p.dest.ip, &server, sizeof(p.dest.ip));
1236 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1237 break;
1238 case AF_INET6:
1239 memcpy(&p.dest.ip6, &server, sizeof(p.dest.ip6));
1240 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1241 break;
1242 default:
1243 return NT_STATUS_INTERNAL_ERROR;
1247 * We want to be told about IP releases
1250 status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1251 cb, private_data);
1252 if (!NT_STATUS_IS_OK(status)) {
1253 return status;
1257 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1258 * can send an extra ack to trigger a reset for our client, so it
1259 * immediately reconnects
1261 return ctdbd_control(conn, CTDB_CURRENT_NODE,
1262 CTDB_CONTROL_TCP_CLIENT, 0,
1263 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL);
1267 call a control on the local node
1269 NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1270 uint64_t srvid, uint32_t flags, TDB_DATA data,
1271 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1272 int *cstatus)
1274 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data, mem_ctx, outdata, cstatus);
1277 NTSTATUS ctdb_watch_us(struct ctdbd_connection *conn)
1279 struct ctdb_client_notify_register reg_data;
1280 size_t struct_len;
1281 NTSTATUS status;
1282 int cstatus;
1284 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1285 reg_data.len = 1;
1286 reg_data.notify_data[0] = 0;
1288 struct_len = offsetof(struct ctdb_client_notify_register,
1289 notify_data) + reg_data.len;
1291 status = ctdbd_control_local(
1292 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1293 make_tdb_data((uint8_t *)&reg_data, struct_len),
1294 NULL, NULL, &cstatus);
1295 if (!NT_STATUS_IS_OK(status)) {
1296 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1297 nt_errstr(status)));
1299 return status;
1302 NTSTATUS ctdb_unwatch(struct ctdbd_connection *conn)
1304 struct ctdb_client_notify_deregister dereg_data;
1305 NTSTATUS status;
1306 int cstatus;
1308 dereg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1310 status = ctdbd_control_local(
1311 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1312 make_tdb_data((uint8_t *)&dereg_data, sizeof(dereg_data)),
1313 NULL, NULL, &cstatus);
1314 if (!NT_STATUS_IS_OK(status)) {
1315 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1316 nt_errstr(status)));
1318 return status;
1321 NTSTATUS ctdbd_probe(void)
1324 * Do a very early check if ctdbd is around to avoid an abort and core
1325 * later
1327 struct ctdbd_connection *conn = NULL;
1328 NTSTATUS status;
1330 status = ctdbd_messaging_connection(talloc_tos(), &conn);
1333 * We only care if we can connect.
1335 TALLOC_FREE(conn);
1337 return status;