s3:dbwrap: move the db_open_rbt() prototype to a new header dbwrap_rbt.h
[Samba.git] / source3 / lib / ctdbd_conn.c
blob21a417c00dd42a3e2670d7bb4da14857975331a6
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "includes.h"
22 #include "util_tdb.h"
24 #ifdef CLUSTER_SUPPORT
26 #include "ctdbd_conn.h"
27 #include "ctdb_packet.h"
28 #include "messages.h"
31 * It is not possible to include ctdb.h and tdb_compat.h (included via
32 * some other include above) without warnings. This fixes those
33 * warnings.
36 #ifdef typesafe_cb
37 #undef typesafe_cb
38 #endif
40 #ifdef typesafe_cb_preargs
41 #undef typesafe_cb_preargs
42 #endif
44 #ifdef typesafe_cb_postargs
45 #undef typesafe_cb_postargs
46 #endif
48 /* paths to these include files come from --with-ctdb= in configure */
50 #include "ctdb.h"
51 #include "ctdb_private.h"
53 struct ctdbd_connection {
54 struct messaging_context *msg_ctx;
55 uint32 reqid;
56 uint32 our_vnn;
57 uint64 rand_srvid;
58 struct ctdb_packet_context *pkt;
59 struct fd_event *fde;
61 void (*release_ip_handler)(const char *ip_addr, void *private_data);
62 void *release_ip_priv;
65 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
66 uint32_t vnn, uint32 opcode,
67 uint64_t srvid, uint32_t flags, TDB_DATA data,
68 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
69 int *cstatus);
72 * exit on fatal communications errors with the ctdbd daemon
74 static void cluster_fatal(const char *why)
76 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
77 /* we don't use smb_panic() as we don't want to delay to write
78 a core file. We need to release this process id immediately
79 so that someone else can take over without getting sharing
80 violations */
81 _exit(1);
87 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
89 if (DEBUGLEVEL < 10) {
90 return;
92 DEBUGADD(10, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
93 (int)hdr->length, (int)hdr->ctdb_magic,
94 (int)hdr->ctdb_version, (int)hdr->generation,
95 (int)hdr->operation, (int)hdr->reqid));
99 * Register a srvid with ctdbd
101 static NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn,
102 uint64_t srvid)
105 int cstatus;
106 return ctdbd_control(conn, CTDB_CURRENT_NODE,
107 CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
108 tdb_null, NULL, NULL, &cstatus);
112 * get our vnn from the cluster
114 static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32 *vnn)
116 int32_t cstatus=-1;
117 NTSTATUS status;
118 status = ctdbd_control(conn,
119 CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
120 tdb_null, NULL, NULL, &cstatus);
121 if (!NT_STATUS_IS_OK(status)) {
122 cluster_fatal("ctdbd_control failed\n");
124 *vnn = (uint32_t)cstatus;
125 return status;
129 * Are we active (i.e. not banned or stopped?)
131 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
133 int32_t cstatus=-1;
134 NTSTATUS status;
135 TDB_DATA outdata;
136 struct ctdb_node_map *m;
137 uint32_t failure_flags;
138 bool ret = false;
139 int i;
141 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
142 CTDB_CONTROL_GET_NODEMAP, 0, 0,
143 tdb_null, talloc_tos(), &outdata, &cstatus);
144 if (!NT_STATUS_IS_OK(status)) {
145 cluster_fatal("ctdbd_control failed\n");
147 if ((cstatus != 0) || (outdata.dptr == NULL)) {
148 DEBUG(2, ("Received invalid ctdb data\n"));
149 return false;
152 m = (struct ctdb_node_map *)outdata.dptr;
154 for (i=0; i<m->num; i++) {
155 if (vnn == m->nodes[i].pnn) {
156 break;
160 if (i == m->num) {
161 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
162 (int)vnn));
163 goto fail;
166 failure_flags = NODE_FLAGS_BANNED | NODE_FLAGS_DISCONNECTED
167 | NODE_FLAGS_PERMANENTLY_DISABLED | NODE_FLAGS_STOPPED;
169 if ((m->nodes[i].flags & failure_flags) != 0) {
170 DEBUG(2, ("Node has status %x, not active\n",
171 (int)m->nodes[i].flags));
172 goto fail;
175 ret = true;
176 fail:
177 TALLOC_FREE(outdata.dptr);
178 return ret;
181 uint32 ctdbd_vnn(const struct ctdbd_connection *conn)
183 return conn->our_vnn;
187 * Get us a ctdb connection
190 static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx,
191 struct ctdb_packet_context **presult)
193 struct ctdb_packet_context *result;
194 const char *sockname = lp_ctdbd_socket();
195 struct sockaddr_un addr;
196 int fd;
198 if (!sockname || !*sockname) {
199 sockname = CTDB_PATH;
202 fd = socket(AF_UNIX, SOCK_STREAM, 0);
203 if (fd == -1) {
204 DEBUG(3, ("Could not create socket: %s\n", strerror(errno)));
205 return map_nt_error_from_unix(errno);
208 ZERO_STRUCT(addr);
209 addr.sun_family = AF_UNIX;
210 strncpy(addr.sun_path, sockname, sizeof(addr.sun_path));
212 if (sys_connect(fd, (struct sockaddr *)(void *)&addr) == -1) {
213 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
214 strerror(errno)));
215 close(fd);
216 return map_nt_error_from_unix(errno);
219 if (!(result = ctdb_packet_init(mem_ctx, fd))) {
220 close(fd);
221 return NT_STATUS_NO_MEMORY;
224 *presult = result;
225 return NT_STATUS_OK;
229 * Do we have a complete ctdb packet in the queue?
232 static bool ctdb_req_complete(const uint8_t *buf, size_t available,
233 size_t *length,
234 void *private_data)
236 uint32 msglen;
238 if (available < sizeof(msglen)) {
239 return False;
242 msglen = *((uint32 *)buf);
244 DEBUG(10, ("msglen = %d\n", msglen));
246 if (msglen < sizeof(struct ctdb_req_header)) {
247 DEBUG(0, ("Got invalid msglen: %d, expected at least %d for "
248 "the req_header\n", (int)msglen,
249 (int)sizeof(struct ctdb_req_header)));
250 cluster_fatal("ctdbd protocol error\n");
253 if (available < msglen) {
254 return false;
257 *length = msglen;
258 return true;
262 * State necessary to defer an incoming message while we are waiting for a
263 * ctdb reply.
266 struct deferred_msg_state {
267 struct messaging_context *msg_ctx;
268 struct messaging_rec *rec;
272 * Timed event handler for the deferred message
275 static void deferred_message_dispatch(struct event_context *event_ctx,
276 struct timed_event *te,
277 struct timeval now,
278 void *private_data)
280 struct deferred_msg_state *state = talloc_get_type_abort(
281 private_data, struct deferred_msg_state);
283 messaging_dispatch_rec(state->msg_ctx, state->rec);
284 TALLOC_FREE(state);
285 TALLOC_FREE(te);
288 struct req_pull_state {
289 TALLOC_CTX *mem_ctx;
290 DATA_BLOB req;
294 * Pull a ctdb request out of the incoming ctdb_packet queue
297 static NTSTATUS ctdb_req_pull(uint8_t *buf, size_t length,
298 void *private_data)
300 struct req_pull_state *state = (struct req_pull_state *)private_data;
302 state->req.data = talloc_move(state->mem_ctx, &buf);
303 state->req.length = length;
304 return NT_STATUS_OK;
308 * Fetch a messaging_rec from an incoming ctdb style message
311 static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx,
312 size_t overall_length,
313 struct ctdb_req_message *msg)
315 struct messaging_rec *result;
316 DATA_BLOB blob;
317 enum ndr_err_code ndr_err;
319 if ((overall_length < offsetof(struct ctdb_req_message, data))
320 || (overall_length
321 < offsetof(struct ctdb_req_message, data) + msg->datalen)) {
323 cluster_fatal("got invalid msg length");
326 if (!(result = talloc(mem_ctx, struct messaging_rec))) {
327 DEBUG(0, ("talloc failed\n"));
328 return NULL;
331 blob = data_blob_const(msg->data, msg->datalen);
333 ndr_err = ndr_pull_struct_blob(
334 &blob, result, result,
335 (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
337 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
338 DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
339 ndr_errstr(ndr_err)));
340 TALLOC_FREE(result);
341 return NULL;
344 if (DEBUGLEVEL >= 10) {
345 DEBUG(10, ("ctdb_pull_messaging_rec:\n"));
346 NDR_PRINT_DEBUG(messaging_rec, result);
349 return result;
352 static NTSTATUS ctdb_packet_fd_read_sync(struct ctdb_packet_context *ctx)
354 int timeout = lp_ctdb_timeout();
356 if (timeout == 0) {
357 timeout = -1;
359 return ctdb_packet_fd_read_sync_timeout(ctx, timeout);
363 * Read a full ctdbd request. If we have a messaging context, defer incoming
364 * messages that might come in between.
367 static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid,
368 TALLOC_CTX *mem_ctx, void *result)
370 struct ctdb_req_header *hdr;
371 struct req_pull_state state;
372 NTSTATUS status;
374 again:
376 status = ctdb_packet_fd_read_sync(conn->pkt);
378 if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) {
379 /* EAGAIN */
380 goto again;
381 } else if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) {
382 /* EAGAIN */
383 goto again;
386 if (!NT_STATUS_IS_OK(status)) {
387 DEBUG(0, ("ctdb_packet_fd_read failed: %s\n", nt_errstr(status)));
388 cluster_fatal("ctdbd died\n");
391 next_pkt:
393 ZERO_STRUCT(state);
394 state.mem_ctx = mem_ctx;
396 if (!ctdb_packet_handler(conn->pkt, ctdb_req_complete, ctdb_req_pull,
397 &state, &status)) {
399 * Not enough data
401 DEBUG(10, ("not enough data from ctdb socket, retrying\n"));
402 goto again;
405 if (!NT_STATUS_IS_OK(status)) {
406 DEBUG(0, ("Could not read ctdb_packet: %s\n", nt_errstr(status)));
407 cluster_fatal("ctdbd died\n");
410 hdr = (struct ctdb_req_header *)state.req.data;
412 DEBUG(10, ("Received ctdb packet\n"));
413 ctdb_packet_dump(hdr);
415 if (hdr->operation == CTDB_REQ_MESSAGE) {
416 struct timed_event *evt;
417 struct deferred_msg_state *msg_state;
418 struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
420 if (conn->msg_ctx == NULL) {
421 DEBUG(1, ("Got a message without having a msg ctx, "
422 "dropping msg %llu\n",
423 (long long unsigned)msg->srvid));
424 goto next_pkt;
427 if ((conn->release_ip_handler != NULL)
428 && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
429 /* must be dispatched immediately */
430 DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n"));
431 conn->release_ip_handler((const char *)msg->data,
432 conn->release_ip_priv);
433 TALLOC_FREE(hdr);
434 goto next_pkt;
437 if ((msg->srvid == CTDB_SRVID_RECONFIGURE)
438 || (msg->srvid == CTDB_SRVID_SAMBA_NOTIFY)) {
440 DEBUG(1, ("ctdb_read_req: Got %s message\n",
441 (msg->srvid == CTDB_SRVID_RECONFIGURE)
442 ? "cluster reconfigure" : "SAMBA_NOTIFY"));
444 messaging_send(conn->msg_ctx,
445 messaging_server_id(conn->msg_ctx),
446 MSG_SMB_BRL_VALIDATE, &data_blob_null);
447 messaging_send(conn->msg_ctx,
448 messaging_server_id(conn->msg_ctx),
449 MSG_DBWRAP_G_LOCK_RETRY,
450 &data_blob_null);
451 TALLOC_FREE(hdr);
452 goto next_pkt;
455 msg_state = talloc(NULL, struct deferred_msg_state);
456 if (msg_state == NULL) {
457 DEBUG(0, ("talloc failed\n"));
458 TALLOC_FREE(hdr);
459 goto next_pkt;
462 if (!(msg_state->rec = ctdb_pull_messaging_rec(
463 msg_state, state.req.length, msg))) {
464 DEBUG(0, ("ctdbd_pull_messaging_rec failed\n"));
465 TALLOC_FREE(msg_state);
466 TALLOC_FREE(hdr);
467 goto next_pkt;
470 TALLOC_FREE(hdr);
472 msg_state->msg_ctx = conn->msg_ctx;
475 * We're waiting for a call reply, but an async message has
476 * crossed. Defer dispatching to the toplevel event loop.
478 evt = event_add_timed(conn->msg_ctx->event_ctx,
479 conn->msg_ctx->event_ctx,
480 timeval_zero(),
481 deferred_message_dispatch,
482 msg_state);
483 if (evt == NULL) {
484 DEBUG(0, ("event_add_timed failed\n"));
485 TALLOC_FREE(msg_state);
486 TALLOC_FREE(hdr);
487 goto next_pkt;
490 goto next_pkt;
493 if (hdr->reqid != reqid) {
494 /* we got the wrong reply */
495 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
496 "been %u\n", hdr->reqid, reqid));
497 TALLOC_FREE(hdr);
498 goto again;
501 *((void **)result) = talloc_move(mem_ctx, &hdr);
503 return NT_STATUS_OK;
507 * Get us a ctdbd connection
510 static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
511 struct ctdbd_connection **pconn)
513 struct ctdbd_connection *conn;
514 NTSTATUS status;
516 if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
517 DEBUG(0, ("talloc failed\n"));
518 return NT_STATUS_NO_MEMORY;
521 status = ctdbd_connect(conn, &conn->pkt);
523 if (!NT_STATUS_IS_OK(status)) {
524 DEBUG(10, ("ctdbd_connect failed: %s\n", nt_errstr(status)));
525 goto fail;
528 status = get_cluster_vnn(conn, &conn->our_vnn);
530 if (!NT_STATUS_IS_OK(status)) {
531 DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status)));
532 goto fail;
535 if (!ctdbd_working(conn, conn->our_vnn)) {
536 DEBUG(2, ("Node is not working, can not connect\n"));
537 status = NT_STATUS_INTERNAL_DB_ERROR;
538 goto fail;
541 generate_random_buffer((unsigned char *)&conn->rand_srvid,
542 sizeof(conn->rand_srvid));
544 status = register_with_ctdbd(conn, conn->rand_srvid);
546 if (!NT_STATUS_IS_OK(status)) {
547 DEBUG(5, ("Could not register random srvid: %s\n",
548 nt_errstr(status)));
549 goto fail;
552 *pconn = conn;
553 return NT_STATUS_OK;
555 fail:
556 TALLOC_FREE(conn);
557 return status;
561 * Get us a ctdbd connection and register us as a process
564 NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
565 struct ctdbd_connection **pconn)
567 struct ctdbd_connection *conn;
568 NTSTATUS status;
570 status = ctdbd_init_connection(mem_ctx, &conn);
572 if (!NT_STATUS_IS_OK(status)) {
573 return status;
576 status = register_with_ctdbd(conn, (uint64_t)sys_getpid());
577 if (!NT_STATUS_IS_OK(status)) {
578 goto fail;
581 status = register_with_ctdbd(conn, MSG_SRVID_SAMBA);
582 if (!NT_STATUS_IS_OK(status)) {
583 goto fail;
586 status = register_with_ctdbd(conn, CTDB_SRVID_SAMBA_NOTIFY);
587 if (!NT_STATUS_IS_OK(status)) {
588 goto fail;
591 *pconn = conn;
592 return NT_STATUS_OK;
594 fail:
595 TALLOC_FREE(conn);
596 return status;
599 struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn)
601 return conn->msg_ctx;
604 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
606 return ctdb_packet_get_fd(conn->pkt);
610 * Packet handler to receive and handle a ctdb message
612 static NTSTATUS ctdb_handle_message(uint8_t *buf, size_t length,
613 void *private_data)
615 struct ctdbd_connection *conn = talloc_get_type_abort(
616 private_data, struct ctdbd_connection);
617 struct ctdb_req_message *msg;
618 struct messaging_rec *msg_rec;
620 msg = (struct ctdb_req_message *)buf;
622 if (msg->hdr.operation != CTDB_REQ_MESSAGE) {
623 DEBUG(0, ("Received async msg of type %u, discarding\n",
624 msg->hdr.operation));
625 TALLOC_FREE(buf);
626 return NT_STATUS_INVALID_PARAMETER;
629 if ((conn->release_ip_handler != NULL)
630 && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
631 /* must be dispatched immediately */
632 DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n"));
633 conn->release_ip_handler((const char *)msg->data,
634 conn->release_ip_priv);
635 TALLOC_FREE(buf);
636 return NT_STATUS_OK;
639 SMB_ASSERT(conn->msg_ctx != NULL);
641 if ((msg->srvid == CTDB_SRVID_RECONFIGURE)
642 || (msg->srvid == CTDB_SRVID_SAMBA_NOTIFY)){
643 DEBUG(0,("Got cluster reconfigure message\n"));
645 * when the cluster is reconfigured or someone of the
646 * family has passed away (SAMBA_NOTIFY), we need to
647 * clean the brl database
649 messaging_send(conn->msg_ctx,
650 messaging_server_id(conn->msg_ctx),
651 MSG_SMB_BRL_VALIDATE, &data_blob_null);
653 messaging_send(conn->msg_ctx,
654 messaging_server_id(conn->msg_ctx),
655 MSG_DBWRAP_G_LOCK_RETRY,
656 &data_blob_null);
658 TALLOC_FREE(buf);
659 return NT_STATUS_OK;
662 /* only messages to our pid or the broadcast are valid here */
663 if (msg->srvid != sys_getpid() && msg->srvid != MSG_SRVID_SAMBA) {
664 DEBUG(0,("Got unexpected message with srvid=%llu\n",
665 (unsigned long long)msg->srvid));
666 TALLOC_FREE(buf);
667 return NT_STATUS_OK;
670 if (!(msg_rec = ctdb_pull_messaging_rec(NULL, length, msg))) {
671 DEBUG(10, ("ctdb_pull_messaging_rec failed\n"));
672 TALLOC_FREE(buf);
673 return NT_STATUS_NO_MEMORY;
676 messaging_dispatch_rec(conn->msg_ctx, msg_rec);
678 TALLOC_FREE(msg_rec);
679 TALLOC_FREE(buf);
680 return NT_STATUS_OK;
684 * The ctdbd socket is readable asynchronuously
687 static void ctdbd_socket_handler(struct event_context *event_ctx,
688 struct fd_event *event,
689 uint16 flags,
690 void *private_data)
692 struct ctdbd_connection *conn = talloc_get_type_abort(
693 private_data, struct ctdbd_connection);
695 NTSTATUS status;
697 status = ctdb_packet_fd_read(conn->pkt);
699 if (!NT_STATUS_IS_OK(status)) {
700 DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status)));
701 cluster_fatal("ctdbd died\n");
704 while (ctdb_packet_handler(conn->pkt, ctdb_req_complete,
705 ctdb_handle_message, conn, &status)) {
706 if (!NT_STATUS_IS_OK(status)) {
707 DEBUG(10, ("could not handle incoming message: %s\n",
708 nt_errstr(status)));
714 * Prepare a ctdbd connection to receive messages
717 NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn,
718 struct messaging_context *msg_ctx)
720 SMB_ASSERT(conn->msg_ctx == NULL);
721 SMB_ASSERT(conn->fde == NULL);
723 if (!(conn->fde = event_add_fd(msg_ctx->event_ctx, conn,
724 ctdb_packet_get_fd(conn->pkt),
725 EVENT_FD_READ,
726 ctdbd_socket_handler,
727 conn))) {
728 DEBUG(0, ("event_add_fd failed\n"));
729 return NT_STATUS_NO_MEMORY;
732 conn->msg_ctx = msg_ctx;
734 return NT_STATUS_OK;
738 * Send a messaging message across a ctdbd
741 NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn,
742 uint32 dst_vnn, uint64 dst_srvid,
743 struct messaging_rec *msg)
745 struct ctdb_req_message r;
746 TALLOC_CTX *mem_ctx;
747 DATA_BLOB blob;
748 NTSTATUS status;
749 enum ndr_err_code ndr_err;
751 if (!(mem_ctx = talloc_init("ctdbd_messaging_send"))) {
752 DEBUG(0, ("talloc failed\n"));
753 return NT_STATUS_NO_MEMORY;
756 ndr_err = ndr_push_struct_blob(
757 &blob, mem_ctx, msg,
758 (ndr_push_flags_fn_t)ndr_push_messaging_rec);
760 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
761 DEBUG(0, ("ndr_push_struct_blob failed: %s\n",
762 ndr_errstr(ndr_err)));
763 status = ndr_map_error2ntstatus(ndr_err);
764 goto fail;
767 r.hdr.length = offsetof(struct ctdb_req_message, data) + blob.length;
768 r.hdr.ctdb_magic = CTDB_MAGIC;
769 r.hdr.ctdb_version = CTDB_VERSION;
770 r.hdr.generation = 1;
771 r.hdr.operation = CTDB_REQ_MESSAGE;
772 r.hdr.destnode = dst_vnn;
773 r.hdr.srcnode = conn->our_vnn;
774 r.hdr.reqid = 0;
775 r.srvid = dst_srvid;
776 r.datalen = blob.length;
778 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
779 ctdb_packet_dump(&r.hdr);
781 status = ctdb_packet_send(
782 conn->pkt, 2,
783 data_blob_const(&r, offsetof(struct ctdb_req_message, data)),
784 blob);
786 if (!NT_STATUS_IS_OK(status)) {
787 DEBUG(0, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
788 goto fail;
791 status = ctdb_packet_flush(conn->pkt);
793 if (!NT_STATUS_IS_OK(status)) {
794 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
795 cluster_fatal("cluster dispatch daemon msg write error\n");
798 status = NT_STATUS_OK;
799 fail:
800 TALLOC_FREE(mem_ctx);
801 return status;
805 * send/recv a generic ctdb control message
807 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
808 uint32_t vnn, uint32 opcode,
809 uint64_t srvid, uint32_t flags,
810 TDB_DATA data,
811 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
812 int *cstatus)
814 struct ctdb_req_control req;
815 struct ctdb_reply_control *reply = NULL;
816 struct ctdbd_connection *new_conn = NULL;
817 NTSTATUS status;
819 if (conn == NULL) {
820 status = ctdbd_init_connection(NULL, &new_conn);
822 if (!NT_STATUS_IS_OK(status)) {
823 DEBUG(10, ("Could not init temp connection: %s\n",
824 nt_errstr(status)));
825 goto fail;
828 conn = new_conn;
831 ZERO_STRUCT(req);
832 req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize;
833 req.hdr.ctdb_magic = CTDB_MAGIC;
834 req.hdr.ctdb_version = CTDB_VERSION;
835 req.hdr.operation = CTDB_REQ_CONTROL;
836 req.hdr.reqid = ++conn->reqid;
837 req.hdr.destnode = vnn;
838 req.opcode = opcode;
839 req.srvid = srvid;
840 req.datalen = data.dsize;
841 req.flags = flags;
843 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
844 ctdb_packet_dump(&req.hdr);
846 status = ctdb_packet_send(
847 conn->pkt, 2,
848 data_blob_const(&req, offsetof(struct ctdb_req_control, data)),
849 data_blob_const(data.dptr, data.dsize));
851 if (!NT_STATUS_IS_OK(status)) {
852 DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
853 goto fail;
856 status = ctdb_packet_flush(conn->pkt);
858 if (!NT_STATUS_IS_OK(status)) {
859 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
860 cluster_fatal("cluster dispatch daemon control write error\n");
863 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
864 TALLOC_FREE(new_conn);
865 if (cstatus) {
866 *cstatus = 0;
868 return NT_STATUS_OK;
871 status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
873 if (!NT_STATUS_IS_OK(status)) {
874 DEBUG(10, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
875 goto fail;
878 if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
879 DEBUG(0, ("received invalid reply\n"));
880 goto fail;
883 if (outdata) {
884 if (!(outdata->dptr = (uint8 *)talloc_memdup(
885 mem_ctx, reply->data, reply->datalen))) {
886 TALLOC_FREE(reply);
887 return NT_STATUS_NO_MEMORY;
889 outdata->dsize = reply->datalen;
891 if (cstatus) {
892 (*cstatus) = reply->status;
895 status = NT_STATUS_OK;
897 fail:
898 TALLOC_FREE(new_conn);
899 TALLOC_FREE(reply);
900 return status;
904 * see if a remote process exists
906 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32 vnn, pid_t pid)
908 NTSTATUS status;
909 TDB_DATA data;
910 int32_t cstatus;
912 data.dptr = (uint8_t*)&pid;
913 data.dsize = sizeof(pid);
915 status = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0, 0,
916 data, NULL, NULL, &cstatus);
917 if (!NT_STATUS_IS_OK(status)) {
918 DEBUG(0, (__location__ " ctdb_control for process_exists "
919 "failed\n"));
920 return False;
923 return cstatus == 0;
927 * Get a db path
929 char *ctdbd_dbpath(struct ctdbd_connection *conn,
930 TALLOC_CTX *mem_ctx, uint32_t db_id)
932 NTSTATUS status;
933 TDB_DATA data;
934 int32_t cstatus;
936 data.dptr = (uint8_t*)&db_id;
937 data.dsize = sizeof(db_id);
939 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
940 CTDB_CONTROL_GETDBPATH, 0, 0, data,
941 mem_ctx, &data, &cstatus);
942 if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
943 DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n"));
944 return NULL;
947 return (char *)data.dptr;
951 * attach to a ctdb database
953 NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
954 const char *name, uint32_t *db_id, int tdb_flags)
956 NTSTATUS status;
957 TDB_DATA data;
958 int32_t cstatus;
959 bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
961 data.dptr = (uint8_t*)name;
962 data.dsize = strlen(name)+1;
964 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
965 persistent
966 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
967 : CTDB_CONTROL_DB_ATTACH,
968 tdb_flags, 0, data, NULL, &data, &cstatus);
969 if (!NT_STATUS_IS_OK(status)) {
970 DEBUG(0, (__location__ " ctdb_control for db_attach "
971 "failed: %s\n", nt_errstr(status)));
972 return status;
975 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
976 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
977 return NT_STATUS_INTERNAL_ERROR;
980 *db_id = *(uint32_t *)data.dptr;
981 talloc_free(data.dptr);
983 if (!(tdb_flags & TDB_SEQNUM)) {
984 return NT_STATUS_OK;
987 data.dptr = (uint8_t *)db_id;
988 data.dsize = sizeof(*db_id);
990 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
991 CTDB_CONTROL_ENABLE_SEQNUM, 0, 0, data,
992 NULL, NULL, &cstatus);
993 if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
994 DEBUG(0,(__location__ " ctdb_control for enable seqnum "
995 "failed\n"));
996 return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR :
997 status;
1000 return NT_STATUS_OK;
1004 * force the migration of a record to this node
1006 NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id,
1007 TDB_DATA key)
1009 struct ctdb_req_call req;
1010 struct ctdb_reply_call *reply;
1011 NTSTATUS status;
1013 ZERO_STRUCT(req);
1015 req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1016 req.hdr.ctdb_magic = CTDB_MAGIC;
1017 req.hdr.ctdb_version = CTDB_VERSION;
1018 req.hdr.operation = CTDB_REQ_CALL;
1019 req.hdr.reqid = ++conn->reqid;
1020 req.flags = CTDB_IMMEDIATE_MIGRATION;
1021 req.callid = CTDB_NULL_FUNC;
1022 req.db_id = db_id;
1023 req.keylen = key.dsize;
1025 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
1026 ctdb_packet_dump(&req.hdr);
1028 status = ctdb_packet_send(
1029 conn->pkt, 2,
1030 data_blob_const(&req, offsetof(struct ctdb_req_call, data)),
1031 data_blob_const(key.dptr, key.dsize));
1033 if (!NT_STATUS_IS_OK(status)) {
1034 DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
1035 return status;
1038 status = ctdb_packet_flush(conn->pkt);
1040 if (!NT_STATUS_IS_OK(status)) {
1041 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
1042 cluster_fatal("cluster dispatch daemon control write error\n");
1045 status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
1047 if (!NT_STATUS_IS_OK(status)) {
1048 DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
1049 goto fail;
1052 if (reply->hdr.operation != CTDB_REPLY_CALL) {
1053 DEBUG(0, ("received invalid reply\n"));
1054 status = NT_STATUS_INTERNAL_ERROR;
1055 goto fail;
1058 status = NT_STATUS_OK;
1059 fail:
1061 TALLOC_FREE(reply);
1062 return status;
1066 * remotely fetch a record without locking it or forcing a migration
1068 NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32 db_id,
1069 TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data)
1071 struct ctdb_req_call req;
1072 struct ctdb_reply_call *reply;
1073 NTSTATUS status;
1075 ZERO_STRUCT(req);
1077 req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1078 req.hdr.ctdb_magic = CTDB_MAGIC;
1079 req.hdr.ctdb_version = CTDB_VERSION;
1080 req.hdr.operation = CTDB_REQ_CALL;
1081 req.hdr.reqid = ++conn->reqid;
1082 req.flags = 0;
1083 req.callid = CTDB_FETCH_FUNC;
1084 req.db_id = db_id;
1085 req.keylen = key.dsize;
1087 status = ctdb_packet_send(
1088 conn->pkt, 2,
1089 data_blob_const(&req, offsetof(struct ctdb_req_call, data)),
1090 data_blob_const(key.dptr, key.dsize));
1092 if (!NT_STATUS_IS_OK(status)) {
1093 DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
1094 return status;
1097 status = ctdb_packet_flush(conn->pkt);
1099 if (!NT_STATUS_IS_OK(status)) {
1100 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
1101 cluster_fatal("cluster dispatch daemon control write error\n");
1104 status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
1106 if (!NT_STATUS_IS_OK(status)) {
1107 DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
1108 goto fail;
1111 if (reply->hdr.operation != CTDB_REPLY_CALL) {
1112 DEBUG(0, ("received invalid reply\n"));
1113 status = NT_STATUS_INTERNAL_ERROR;
1114 goto fail;
1117 data->dsize = reply->datalen;
1118 if (data->dsize == 0) {
1119 data->dptr = NULL;
1120 goto done;
1123 data->dptr = (uint8 *)talloc_memdup(mem_ctx, &reply->data[0],
1124 reply->datalen);
1125 if (data->dptr == NULL) {
1126 DEBUG(0, ("talloc failed\n"));
1127 status = NT_STATUS_NO_MEMORY;
1128 goto fail;
1131 done:
1132 status = NT_STATUS_OK;
1133 fail:
1134 TALLOC_FREE(reply);
1135 return status;
1138 struct ctdbd_traverse_state {
1139 void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data);
1140 void *private_data;
1144 * Handle a traverse record coming in on the ctdbd connection
1147 static NTSTATUS ctdb_traverse_handler(uint8_t *buf, size_t length,
1148 void *private_data)
1150 struct ctdbd_traverse_state *state =
1151 (struct ctdbd_traverse_state *)private_data;
1153 struct ctdb_req_message *m;
1154 struct ctdb_rec_data *d;
1155 TDB_DATA key, data;
1157 m = (struct ctdb_req_message *)buf;
1159 if (length < sizeof(*m) || m->hdr.length != length) {
1160 DEBUG(0, ("Got invalid message of length %d\n", (int)length));
1161 TALLOC_FREE(buf);
1162 return NT_STATUS_UNEXPECTED_IO_ERROR;
1165 d = (struct ctdb_rec_data *)&m->data[0];
1166 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1167 DEBUG(0, ("Got invalid traverse data of length %d\n",
1168 (int)m->datalen));
1169 TALLOC_FREE(buf);
1170 return NT_STATUS_UNEXPECTED_IO_ERROR;
1173 key.dsize = d->keylen;
1174 key.dptr = &d->data[0];
1175 data.dsize = d->datalen;
1176 data.dptr = &d->data[d->keylen];
1178 if (key.dsize == 0 && data.dsize == 0) {
1179 /* end of traverse */
1180 return NT_STATUS_END_OF_FILE;
1183 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1184 DEBUG(0, ("Got invalid ltdb header length %d\n",
1185 (int)data.dsize));
1186 TALLOC_FREE(buf);
1187 return NT_STATUS_UNEXPECTED_IO_ERROR;
1189 data.dsize -= sizeof(struct ctdb_ltdb_header);
1190 data.dptr += sizeof(struct ctdb_ltdb_header);
1192 if (state->fn) {
1193 state->fn(key, data, state->private_data);
1196 TALLOC_FREE(buf);
1197 return NT_STATUS_OK;
1201 Traverse a ctdb database. This uses a kind-of hackish way to open a second
1202 connection to ctdbd to avoid the hairy recursive and async problems with
1203 everything in-line.
1206 NTSTATUS ctdbd_traverse(uint32 db_id,
1207 void (*fn)(TDB_DATA key, TDB_DATA data,
1208 void *private_data),
1209 void *private_data)
1211 struct ctdbd_connection *conn;
1212 NTSTATUS status;
1214 TDB_DATA data;
1215 struct ctdb_traverse_start t;
1216 int cstatus;
1217 struct ctdbd_traverse_state state;
1219 status = ctdbd_init_connection(NULL, &conn);
1220 if (!NT_STATUS_IS_OK(status)) {
1221 DEBUG(0, ("ctdbd_init_connection failed: %s\n",
1222 nt_errstr(status)));
1223 return status;
1226 t.db_id = db_id;
1227 t.srvid = conn->rand_srvid;
1228 t.reqid = ++conn->reqid;
1230 data.dptr = (uint8_t *)&t;
1231 data.dsize = sizeof(t);
1233 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1234 CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, 0,
1235 data, NULL, NULL, &cstatus);
1237 if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1239 DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status),
1240 cstatus));
1242 if (NT_STATUS_IS_OK(status)) {
1244 * We need a mapping here
1246 status = NT_STATUS_UNSUCCESSFUL;
1248 goto done;
1251 state.fn = fn;
1252 state.private_data = private_data;
1254 while (True) {
1256 status = NT_STATUS_OK;
1258 if (ctdb_packet_handler(conn->pkt, ctdb_req_complete,
1259 ctdb_traverse_handler, &state, &status)) {
1261 if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
1262 status = NT_STATUS_OK;
1263 break;
1267 * There might be more in the queue
1269 continue;
1272 if (!NT_STATUS_IS_OK(status)) {
1273 break;
1276 status = ctdb_packet_fd_read_sync(conn->pkt);
1278 if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) {
1280 * There might be more in the queue
1282 continue;
1285 if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
1286 status = NT_STATUS_OK;
1287 break;
1290 if (!NT_STATUS_IS_OK(status)) {
1291 DEBUG(0, ("ctdb_packet_fd_read_sync failed: %s\n", nt_errstr(status)));
1292 cluster_fatal("ctdbd died\n");
1296 done:
1297 TALLOC_FREE(conn);
1298 return status;
1302 This is used to canonicalize a ctdb_sock_addr structure.
1304 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1305 struct sockaddr_storage *out)
1307 memcpy(out, in, sizeof (*out));
1309 #ifdef HAVE_IPV6
1310 if (in->ss_family == AF_INET6) {
1311 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1312 const struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)in;
1313 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1314 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1315 memset(out, 0, sizeof(*out));
1316 #ifdef HAVE_SOCK_SIN_LEN
1317 out4->sin_len = sizeof(*out);
1318 #endif
1319 out4->sin_family = AF_INET;
1320 out4->sin_port = in6->sin6_port;
1321 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr32[3], 4);
1324 #endif
1328 * Register us as a server for a particular tcp connection
1331 NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn,
1332 const struct sockaddr_storage *_server,
1333 const struct sockaddr_storage *_client,
1334 void (*release_ip_handler)(const char *ip_addr,
1335 void *private_data),
1336 void *private_data)
1339 * we still use ctdb_control_tcp for ipv4
1340 * because we want to work against older ctdb
1341 * versions at runtime
1343 struct ctdb_control_tcp p4;
1344 #ifdef HAVE_STRUCT_CTDB_CONTROL_TCP_ADDR
1345 struct ctdb_control_tcp_addr p;
1346 #endif
1347 TDB_DATA data;
1348 NTSTATUS status;
1349 struct sockaddr_storage client;
1350 struct sockaddr_storage server;
1353 * Only one connection so far
1355 SMB_ASSERT(conn->release_ip_handler == NULL);
1357 smbd_ctdb_canonicalize_ip(_client, &client);
1358 smbd_ctdb_canonicalize_ip(_server, &server);
1360 switch (client.ss_family) {
1361 case AF_INET:
1362 p4.dest = *(struct sockaddr_in *)(void *)&server;
1363 p4.src = *(struct sockaddr_in *)(void *)&client;
1364 data.dptr = (uint8_t *)&p4;
1365 data.dsize = sizeof(p4);
1366 break;
1367 #ifdef HAVE_STRUCT_CTDB_CONTROL_TCP_ADDR
1368 case AF_INET6:
1369 p.dest.ip6 = *(struct sockaddr_in6 *)(void *)&server;
1370 p.src.ip6 = *(struct sockaddr_in6 *)(void *)&client;
1371 data.dptr = (uint8_t *)&p;
1372 data.dsize = sizeof(p);
1373 break;
1374 #endif
1375 default:
1376 return NT_STATUS_INTERNAL_ERROR;
1379 conn->release_ip_handler = release_ip_handler;
1380 conn->release_ip_priv = private_data;
1383 * We want to be told about IP releases
1386 status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP);
1387 if (!NT_STATUS_IS_OK(status)) {
1388 return status;
1392 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1393 * can send an extra ack to trigger a reset for our client, so it
1394 * immediately reconnects
1396 return ctdbd_control(conn, CTDB_CURRENT_NODE,
1397 CTDB_CONTROL_TCP_CLIENT, 0,
1398 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL);
1402 * We want to handle reconfigure events
1404 NTSTATUS ctdbd_register_reconfigure(struct ctdbd_connection *conn)
1406 return register_with_ctdbd(conn, CTDB_SRVID_RECONFIGURE);
1410 call a control on the local node
1412 NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32 opcode,
1413 uint64_t srvid, uint32_t flags, TDB_DATA data,
1414 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1415 int *cstatus)
1417 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data, mem_ctx, outdata, cstatus);
1420 NTSTATUS ctdb_watch_us(struct ctdbd_connection *conn)
1422 struct ctdb_client_notify_register reg_data;
1423 size_t struct_len;
1424 NTSTATUS status;
1425 int cstatus;
1427 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1428 reg_data.len = 1;
1429 reg_data.notify_data[0] = 0;
1431 struct_len = offsetof(struct ctdb_client_notify_register,
1432 notify_data) + reg_data.len;
1434 status = ctdbd_control_local(
1435 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1436 make_tdb_data((uint8_t *)&reg_data, struct_len),
1437 NULL, NULL, &cstatus);
1438 if (!NT_STATUS_IS_OK(status)) {
1439 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1440 nt_errstr(status)));
1442 return status;
1445 NTSTATUS ctdb_unwatch(struct ctdbd_connection *conn)
1447 struct ctdb_client_notify_deregister dereg_data;
1448 NTSTATUS status;
1449 int cstatus;
1451 dereg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1453 status = ctdbd_control_local(
1454 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1455 make_tdb_data((uint8_t *)&dereg_data, sizeof(dereg_data)),
1456 NULL, NULL, &cstatus);
1457 if (!NT_STATUS_IS_OK(status)) {
1458 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1459 nt_errstr(status)));
1461 return status;
1464 #endif