2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
25 #include "ctdbd_conn.h"
26 #include "system/select.h"
27 #include "lib/util/sys_rw_data.h"
28 #include "lib/util/iov_buf.h"
29 #include "lib/util/select.h"
30 #include "lib/util/debug.h"
31 #include "lib/util/talloc_stack.h"
32 #include "lib/util/genrand.h"
33 #include "lib/util/fault.h"
34 #include "lib/util/dlinklist.h"
35 #include "lib/util/tevent_unix.c"
36 #include "lib/util/sys_rw.h"
37 #include "lib/util/blocking.h"
38 #include "ctdb/include/ctdb_protocol.h"
40 /* paths to these include files come from --with-ctdb= in configure */
42 struct ctdbd_srvid_cb
{
44 int (*cb
)(uint32_t src_vnn
, uint32_t dst_vnn
,
46 const uint8_t *msg
, size_t msglen
,
51 struct ctdb_pkt_send_state
;
52 struct ctdb_pkt_recv_state
;
54 struct ctdbd_connection
{
58 struct ctdbd_srvid_cb
*callbacks
;
62 /* For async connections, enabled via ctdbd_setup_fde() */
63 struct tevent_fd
*fde
;
65 /* State to track in-progress read */
66 struct ctdb_read_state
{
67 /* Receive buffer for the initial packet length */
70 /* iovec state for current read */
75 /* allocated receive buffer based on packet length */
76 struct ctdb_req_header
*hdr
;
79 /* Lists of pending async reads and writes */
80 struct ctdb_pkt_recv_state
*recv_list
;
81 struct ctdb_pkt_send_state
*send_list
;
84 static void ctdbd_async_socket_handler(struct tevent_context
*ev
,
85 struct tevent_fd
*fde
,
89 static bool ctdbd_conn_has_async_sends(struct ctdbd_connection
*conn
)
91 return (conn
->send_list
!= NULL
);
94 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection
*conn
)
96 return (conn
->fde
!= NULL
);
99 static uint32_t ctdbd_next_reqid(struct ctdbd_connection
*conn
)
102 if (conn
->reqid
== 0) {
108 static int ctdbd_control(struct ctdbd_connection
*conn
,
109 uint32_t vnn
, uint32_t opcode
,
110 uint64_t srvid
, uint32_t flags
,
112 TALLOC_CTX
*mem_ctx
, TDB_DATA
*outdata
,
116 * exit on fatal communications errors with the ctdbd daemon
118 static void cluster_fatal(const char *why
)
120 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why
));
121 /* we don't use smb_panic() as we don't want to delay to write
122 a core file. We need to release this process id immediately
123 so that someone else can take over without getting sharing
131 static void ctdb_packet_dump(struct ctdb_req_header
*hdr
)
133 if (DEBUGLEVEL
< 11) {
136 DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
137 (int)hdr
->length
, (int)hdr
->ctdb_magic
,
138 (int)hdr
->ctdb_version
, (int)hdr
->generation
,
139 (int)hdr
->operation
, (int)hdr
->reqid
));
143 * Register a srvid with ctdbd
145 int register_with_ctdbd(struct ctdbd_connection
*conn
, uint64_t srvid
,
146 int (*cb
)(uint32_t src_vnn
, uint32_t dst_vnn
,
148 const uint8_t *msg
, size_t msglen
,
155 size_t num_callbacks
;
156 struct ctdbd_srvid_cb
*tmp
;
158 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_REGISTER_SRVID
, srvid
, 0,
159 tdb_null
, NULL
, NULL
, &cstatus
);
164 num_callbacks
= talloc_array_length(conn
->callbacks
);
166 tmp
= talloc_realloc(conn
, conn
->callbacks
, struct ctdbd_srvid_cb
,
171 conn
->callbacks
= tmp
;
173 conn
->callbacks
[num_callbacks
] = (struct ctdbd_srvid_cb
) {
174 .srvid
= srvid
, .cb
= cb
, .private_data
= private_data
180 static int ctdbd_msg_call_back(struct ctdbd_connection
*conn
,
181 struct ctdb_req_message_old
*msg
)
184 size_t i
, num_callbacks
;
186 msg_len
= msg
->hdr
.length
;
187 if (msg_len
< offsetof(struct ctdb_req_message_old
, data
)) {
188 DBG_DEBUG("len %"PRIu32
" too small\n", msg_len
);
191 msg_len
-= offsetof(struct ctdb_req_message_old
, data
);
193 if (msg_len
< msg
->datalen
) {
194 DBG_DEBUG("msg_len=%"PRIu32
" < msg->datalen=%"PRIu32
"\n",
195 msg_len
, msg
->datalen
);
199 num_callbacks
= talloc_array_length(conn
->callbacks
);
201 for (i
=0; i
<num_callbacks
; i
++) {
202 struct ctdbd_srvid_cb
*cb
= &conn
->callbacks
[i
];
204 if ((cb
->srvid
== msg
->srvid
) && (cb
->cb
!= NULL
)) {
207 ret
= cb
->cb(msg
->hdr
.srcnode
, msg
->hdr
.destnode
,
208 msg
->srvid
, msg
->data
, msg
->datalen
,
219 * get our vnn from the cluster
221 static int get_cluster_vnn(struct ctdbd_connection
*conn
, uint32_t *vnn
)
225 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_GET_PNN
, 0, 0,
226 tdb_null
, NULL
, NULL
, &cstatus
);
228 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret
)));
231 *vnn
= (uint32_t)cstatus
;
236 * Are we active (i.e. not banned or stopped?)
238 static bool ctdbd_working(struct ctdbd_connection
*conn
, uint32_t vnn
)
242 struct ctdb_node_map_old
*m
;
247 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_GET_NODEMAP
, 0, 0,
248 tdb_null
, talloc_tos(), &outdata
, &cstatus
);
250 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret
)));
253 if ((cstatus
!= 0) || (outdata
.dptr
== NULL
)) {
254 DEBUG(2, ("Received invalid ctdb data\n"));
258 m
= (struct ctdb_node_map_old
*)outdata
.dptr
;
260 for (i
=0; i
<m
->num
; i
++) {
261 if (vnn
== m
->nodes
[i
].pnn
) {
267 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
272 if ((m
->nodes
[i
].flags
& NODE_FLAGS_INACTIVE
) != 0) {
273 DEBUG(2, ("Node has status %x, not active\n",
274 (int)m
->nodes
[i
].flags
));
280 TALLOC_FREE(outdata
.dptr
);
284 uint32_t ctdbd_vnn(const struct ctdbd_connection
*conn
)
286 return conn
->our_vnn
;
290 * Get us a ctdb connection
293 static int ctdbd_connect(const char *sockname
, int *pfd
)
295 struct sockaddr_un addr
= { 0, };
300 fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
303 DEBUG(3, ("Could not create socket: %s\n", strerror(err
)));
307 addr
.sun_family
= AF_UNIX
;
309 namelen
= strlcpy(addr
.sun_path
, sockname
, sizeof(addr
.sun_path
));
310 if (namelen
>= sizeof(addr
.sun_path
)) {
311 DEBUG(3, ("%s: Socket name too long: %s\n", __func__
,
317 salen
= sizeof(struct sockaddr_un
);
319 if (connect(fd
, (struct sockaddr
*)(void *)&addr
, salen
) == -1) {
321 DEBUG(1, ("connect(%s) failed: %s\n", sockname
,
331 static int ctdb_read_packet(int fd
, int timeout
, TALLOC_CTX
*mem_ctx
,
332 struct ctdb_req_header
**result
)
334 struct ctdb_req_header
*req
;
339 struct pollfd pfd
= { .fd
= fd
, .events
= POLLIN
};
342 ret
= sys_poll_intr(&pfd
, 1, timeout
);
354 nread
= read_data(fd
, &msglen
, sizeof(msglen
));
362 if (msglen
< sizeof(struct ctdb_req_header
)) {
366 req
= talloc_size(mem_ctx
, msglen
);
370 talloc_set_name_const(req
, "struct ctdb_req_header");
372 req
->length
= msglen
;
374 nread
= read_data(fd
, ((char *)req
) + sizeof(msglen
),
375 msglen
- sizeof(msglen
));
390 * Read a full ctdbd request. If we have a messaging context, defer incoming
391 * messages that might come in between.
394 static int ctdb_read_req(struct ctdbd_connection
*conn
, uint32_t reqid
,
395 TALLOC_CTX
*mem_ctx
, struct ctdb_req_header
**result
)
397 struct ctdb_req_header
*hdr
;
402 ret
= ctdb_read_packet(conn
->fd
, conn
->timeout
, mem_ctx
, &hdr
);
404 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret
)));
405 cluster_fatal("ctdbd died\n");
408 DEBUG(11, ("Received ctdb packet\n"));
409 ctdb_packet_dump(hdr
);
411 if (hdr
->operation
== CTDB_REQ_MESSAGE
) {
412 struct ctdb_req_message_old
*msg
= (struct ctdb_req_message_old
*)hdr
;
414 ret
= ctdbd_msg_call_back(conn
, msg
);
424 if ((reqid
!= 0) && (hdr
->reqid
!= reqid
)) {
425 /* we got the wrong reply */
426 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
427 "been %u\n", hdr
->reqid
, reqid
));
432 *result
= talloc_move(mem_ctx
, &hdr
);
438 * This prepares conn for handling async requests
440 int ctdbd_setup_fde(struct ctdbd_connection
*conn
, struct tevent_context
*ev
)
444 ret
= set_blocking(conn
->fd
, false);
449 conn
->fde
= tevent_add_fd(ev
,
453 ctdbd_async_socket_handler
,
455 if (conn
->fde
== NULL
) {
462 static int ctdbd_connection_destructor(struct ctdbd_connection
*c
);
465 * Get us a ctdbd connection
468 static int ctdbd_init_connection_internal(TALLOC_CTX
*mem_ctx
,
469 const char *sockname
, int timeout
,
470 struct ctdbd_connection
*conn
)
474 conn
->timeout
= timeout
;
475 if (conn
->timeout
== 0) {
479 ret
= ctdbd_connect(sockname
, &conn
->fd
);
481 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret
)));
484 talloc_set_destructor(conn
, ctdbd_connection_destructor
);
486 ret
= get_cluster_vnn(conn
, &conn
->our_vnn
);
488 DEBUG(10, ("get_cluster_vnn failed: %s\n", strerror(ret
)));
492 if (!ctdbd_working(conn
, conn
->our_vnn
)) {
493 DEBUG(2, ("Node is not working, can not connect\n"));
497 generate_random_buffer((unsigned char *)&conn
->rand_srvid
,
498 sizeof(conn
->rand_srvid
));
500 ret
= register_with_ctdbd(conn
, conn
->rand_srvid
, NULL
, NULL
);
502 DEBUG(5, ("Could not register random srvid: %s\n",
510 int ctdbd_init_connection(TALLOC_CTX
*mem_ctx
,
511 const char *sockname
, int timeout
,
512 struct ctdbd_connection
**pconn
)
514 struct ctdbd_connection
*conn
;
517 if (!(conn
= talloc_zero(mem_ctx
, struct ctdbd_connection
))) {
518 DEBUG(0, ("talloc failed\n"));
522 ret
= ctdbd_init_connection_internal(mem_ctx
,
527 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
540 int ctdbd_reinit_connection(TALLOC_CTX
*mem_ctx
,
541 const char *sockname
, int timeout
,
542 struct ctdbd_connection
*conn
)
546 ret
= ctdbd_connection_destructor(conn
);
548 DBG_ERR("ctdbd_connection_destructor failed\n");
552 ret
= ctdbd_init_connection_internal(mem_ctx
,
557 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
565 int ctdbd_conn_get_fd(struct ctdbd_connection
*conn
)
571 * Packet handler to receive and handle a ctdb message
573 static int ctdb_handle_message(struct ctdbd_connection
*conn
,
574 struct ctdb_req_header
*hdr
)
576 struct ctdb_req_message_old
*msg
;
578 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
579 DEBUG(0, ("Received async msg of type %u, discarding\n",
584 msg
= (struct ctdb_req_message_old
*)hdr
;
586 ctdbd_msg_call_back(conn
, msg
);
591 void ctdbd_socket_readable(struct ctdbd_connection
*conn
)
593 struct ctdb_req_header
*hdr
= NULL
;
596 ret
= ctdb_read_packet(conn
->fd
, conn
->timeout
, talloc_tos(), &hdr
);
598 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret
)));
599 cluster_fatal("ctdbd died\n");
602 ret
= ctdb_handle_message(conn
, hdr
);
607 DEBUG(10, ("could not handle incoming message: %s\n",
612 static int ctdb_pkt_send_handler(struct ctdbd_connection
*conn
);
613 static int ctdb_pkt_recv_handler(struct ctdbd_connection
*conn
);
615 /* Used for async connection and async ctcb requests */
616 static void ctdbd_async_socket_handler(struct tevent_context
*ev
,
617 struct tevent_fd
*fde
,
621 struct ctdbd_connection
*conn
= talloc_get_type_abort(
622 private_data
, struct ctdbd_connection
);
625 if ((flags
& TEVENT_FD_READ
) != 0) {
626 ret
= ctdb_pkt_recv_handler(conn
);
628 DBG_DEBUG("ctdb_read_iov_handler returned %s\n",
634 if ((flags
& TEVENT_FD_WRITE
) != 0) {
635 ret
= ctdb_pkt_send_handler(conn
);
637 DBG_DEBUG("ctdb_write_iov_handler returned %s\n",
647 int ctdbd_messaging_send_iov(struct ctdbd_connection
*conn
,
648 uint32_t dst_vnn
, uint64_t dst_srvid
,
649 const struct iovec
*iov
, int iovlen
)
651 struct ctdb_req_message_old r
;
652 struct iovec iov2
[iovlen
+1];
653 size_t buflen
= iov_buflen(iov
, iovlen
);
656 r
.hdr
.length
= offsetof(struct ctdb_req_message_old
, data
) + buflen
;
657 r
.hdr
.ctdb_magic
= CTDB_MAGIC
;
658 r
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
659 r
.hdr
.generation
= 1;
660 r
.hdr
.operation
= CTDB_REQ_MESSAGE
;
661 r
.hdr
.destnode
= dst_vnn
;
662 r
.hdr
.srcnode
= conn
->our_vnn
;
667 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
668 ctdb_packet_dump(&r
.hdr
);
670 iov2
[0].iov_base
= &r
;
671 iov2
[0].iov_len
= offsetof(struct ctdb_req_message_old
, data
);
672 memcpy(&iov2
[1], iov
, iovlen
* sizeof(struct iovec
));
674 nwritten
= write_data_iov(conn
->fd
, iov2
, iovlen
+1);
675 if (nwritten
== -1) {
676 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno
)));
677 cluster_fatal("cluster dispatch daemon msg write error\n");
684 * send/recv a generic ctdb control message
686 static int ctdbd_control(struct ctdbd_connection
*conn
,
687 uint32_t vnn
, uint32_t opcode
,
688 uint64_t srvid
, uint32_t flags
,
690 TALLOC_CTX
*mem_ctx
, TDB_DATA
*outdata
,
693 struct ctdb_req_control_old req
;
694 struct ctdb_req_header
*hdr
;
695 struct ctdb_reply_control_old
*reply
= NULL
;
700 if (ctdbd_conn_has_async_reqs(conn
)) {
702 * Can't use sync call while an async call is in flight. Adding
703 * this check as a safety net. We'll be using different
704 * connections for sync and async requests, so this shouldn't
705 * happen, but who knows...
707 DBG_ERR("Async ctdb req on sync connection\n");
712 req
.hdr
.length
= offsetof(struct ctdb_req_control_old
, data
) + data
.dsize
;
713 req
.hdr
.ctdb_magic
= CTDB_MAGIC
;
714 req
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
715 req
.hdr
.operation
= CTDB_REQ_CONTROL
;
716 req
.hdr
.reqid
= ctdbd_next_reqid(conn
);
717 req
.hdr
.destnode
= vnn
;
720 req
.datalen
= data
.dsize
;
723 DBG_DEBUG("Sending ctdb packet reqid=%"PRIu32
", vnn=%"PRIu32
", "
724 "opcode=%"PRIu32
", srvid=%"PRIu64
"\n", req
.hdr
.reqid
,
725 req
.hdr
.destnode
, req
.opcode
, req
.srvid
);
726 ctdb_packet_dump(&req
.hdr
);
728 iov
[0].iov_base
= &req
;
729 iov
[0].iov_len
= offsetof(struct ctdb_req_control_old
, data
);
730 iov
[1].iov_base
= data
.dptr
;
731 iov
[1].iov_len
= data
.dsize
;
733 nwritten
= write_data_iov(conn
->fd
, iov
, ARRAY_SIZE(iov
));
734 if (nwritten
== -1) {
735 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno
)));
736 cluster_fatal("cluster dispatch daemon msg write error\n");
739 if (flags
& CTDB_CTRL_FLAG_NOREPLY
) {
746 ret
= ctdb_read_req(conn
, req
.hdr
.reqid
, NULL
, &hdr
);
748 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret
)));
752 if (hdr
->operation
!= CTDB_REPLY_CONTROL
) {
753 DEBUG(0, ("received invalid reply\n"));
757 reply
= (struct ctdb_reply_control_old
*)hdr
;
760 if (!(outdata
->dptr
= (uint8_t *)talloc_memdup(
761 mem_ctx
, reply
->data
, reply
->datalen
))) {
765 outdata
->dsize
= reply
->datalen
;
768 (*cstatus
) = reply
->status
;
776 * see if a remote process exists
778 bool ctdbd_process_exists(struct ctdbd_connection
*conn
, uint32_t vnn
, pid_t pid
)
783 ret
= ctdbd_control(conn
, vnn
, CTDB_CONTROL_PROCESS_EXISTS
, 0, 0,
784 (TDB_DATA
) { .dptr
= (uint8_t *)&pid
,
785 .dsize
= sizeof(pid
) },
786 NULL
, NULL
, &cstatus
);
790 return (cstatus
== 0);
796 char *ctdbd_dbpath(struct ctdbd_connection
*conn
,
797 TALLOC_CTX
*mem_ctx
, uint32_t db_id
)
801 TDB_DATA rdata
= {0};
804 data
.dptr
= (uint8_t*)&db_id
;
805 data
.dsize
= sizeof(db_id
);
807 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_GETDBPATH
, 0, 0, data
,
808 mem_ctx
, &rdata
, &cstatus
);
809 if ((ret
!= 0) || cstatus
!= 0) {
810 DEBUG(0, (__location__
" ctdb_control for getdbpath failed: %s\n",
815 return (char *)rdata
.dptr
;
819 * attach to a ctdb database
821 int ctdbd_db_attach(struct ctdbd_connection
*conn
,
822 const char *name
, uint32_t *db_id
, int tdb_flags
)
827 bool persistent
= (tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0;
829 data
= string_term_tdb_data(name
);
831 ret
= ctdbd_control_local(conn
,
833 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
834 : CTDB_CONTROL_DB_ATTACH
,
835 0, 0, data
, NULL
, &data
, &cstatus
);
837 DEBUG(0, (__location__
" ctdb_control for db_attach "
838 "failed: %s\n", strerror(ret
)));
842 if (cstatus
!= 0 || data
.dsize
!= sizeof(uint32_t)) {
843 DEBUG(0,(__location__
" ctdb_control for db_attach failed\n"));
847 *db_id
= *(uint32_t *)data
.dptr
;
848 talloc_free(data
.dptr
);
854 * force the migration of a record to this node
856 int ctdbd_migrate(struct ctdbd_connection
*conn
, uint32_t db_id
, TDB_DATA key
)
858 struct ctdb_req_call_old req
;
859 struct ctdb_req_header
*hdr
= NULL
;
864 if (ctdbd_conn_has_async_reqs(conn
)) {
866 * Can't use sync call while an async call is in flight. Adding
867 * this check as a safety net. We'll be using different
868 * connections for sync and async requests, so this shouldn't
869 * happen, but who knows...
871 DBG_ERR("Async ctdb req on sync connection\n");
877 req
.hdr
.length
= offsetof(struct ctdb_req_call_old
, data
) + key
.dsize
;
878 req
.hdr
.ctdb_magic
= CTDB_MAGIC
;
879 req
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
880 req
.hdr
.operation
= CTDB_REQ_CALL
;
881 req
.hdr
.reqid
= ctdbd_next_reqid(conn
);
882 req
.flags
= CTDB_IMMEDIATE_MIGRATION
;
883 req
.callid
= CTDB_NULL_FUNC
;
885 req
.keylen
= key
.dsize
;
887 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
888 ctdb_packet_dump(&req
.hdr
);
890 iov
[0].iov_base
= &req
;
891 iov
[0].iov_len
= offsetof(struct ctdb_req_call_old
, data
);
892 iov
[1].iov_base
= key
.dptr
;
893 iov
[1].iov_len
= key
.dsize
;
895 nwritten
= write_data_iov(conn
->fd
, iov
, ARRAY_SIZE(iov
));
896 if (nwritten
== -1) {
897 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno
)));
898 cluster_fatal("cluster dispatch daemon msg write error\n");
901 ret
= ctdb_read_req(conn
, req
.hdr
.reqid
, NULL
, &hdr
);
903 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret
)));
907 if (hdr
->operation
!= CTDB_REPLY_CALL
) {
908 if (hdr
->operation
== CTDB_REPLY_ERROR
) {
909 DBG_ERR("received error from ctdb\n");
911 DBG_ERR("received invalid reply\n");
924 * Fetch a record and parse it
926 int ctdbd_parse(struct ctdbd_connection
*conn
, uint32_t db_id
,
927 TDB_DATA key
, bool local_copy
,
928 void (*parser
)(TDB_DATA key
, TDB_DATA data
,
932 struct ctdb_req_call_old req
;
933 struct ctdb_req_header
*hdr
= NULL
;
934 struct ctdb_reply_call_old
*reply
;
940 if (ctdbd_conn_has_async_reqs(conn
)) {
942 * Can't use sync call while an async call is in flight. Adding
943 * this check as a safety net. We'll be using different
944 * connections for sync and async requests, so this shouldn't
945 * happen, but who knows...
947 DBG_ERR("Async ctdb req on sync connection\n");
951 flags
= local_copy
? CTDB_WANT_READONLY
: 0;
955 req
.hdr
.length
= offsetof(struct ctdb_req_call_old
, data
) + key
.dsize
;
956 req
.hdr
.ctdb_magic
= CTDB_MAGIC
;
957 req
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
958 req
.hdr
.operation
= CTDB_REQ_CALL
;
959 req
.hdr
.reqid
= ctdbd_next_reqid(conn
);
961 req
.callid
= CTDB_FETCH_FUNC
;
963 req
.keylen
= key
.dsize
;
965 iov
[0].iov_base
= &req
;
966 iov
[0].iov_len
= offsetof(struct ctdb_req_call_old
, data
);
967 iov
[1].iov_base
= key
.dptr
;
968 iov
[1].iov_len
= key
.dsize
;
970 nwritten
= write_data_iov(conn
->fd
, iov
, ARRAY_SIZE(iov
));
971 if (nwritten
== -1) {
972 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno
)));
973 cluster_fatal("cluster dispatch daemon msg write error\n");
976 ret
= ctdb_read_req(conn
, req
.hdr
.reqid
, NULL
, &hdr
);
978 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret
)));
982 if ((hdr
== NULL
) || (hdr
->operation
!= CTDB_REPLY_CALL
)) {
983 DEBUG(0, ("received invalid reply\n"));
987 reply
= (struct ctdb_reply_call_old
*)hdr
;
989 if (reply
->datalen
== 0) {
991 * Treat an empty record as non-existing
997 parser(key
, make_tdb_data(&reply
->data
[0], reply
->datalen
),
1007 Traverse a ctdb database. "conn" must be an otherwise unused
1008 ctdb_connection where no other messages but the traverse ones are
1012 int ctdbd_traverse(struct ctdbd_connection
*conn
, uint32_t db_id
,
1013 void (*fn
)(TDB_DATA key
, TDB_DATA data
,
1014 void *private_data
),
1019 struct ctdb_traverse_start t
;
1022 if (ctdbd_conn_has_async_reqs(conn
)) {
1024 * Can't use sync call while an async call is in flight. Adding
1025 * this check as a safety net. We'll be using different
1026 * connections for sync and async requests, so this shouldn't
1027 * happen, but who knows...
1029 DBG_ERR("Async ctdb req on sync connection\n");
1034 t
.srvid
= conn
->rand_srvid
;
1035 t
.reqid
= ctdbd_next_reqid(conn
);
1037 data
.dptr
= (uint8_t *)&t
;
1038 data
.dsize
= sizeof(t
);
1040 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_TRAVERSE_START
,
1042 0, data
, NULL
, NULL
, &cstatus
);
1044 if ((ret
!= 0) || (cstatus
!= 0)) {
1045 DEBUG(0,("ctdbd_control failed: %s, %d\n", strerror(ret
),
1050 * We need a mapping here
1058 struct ctdb_req_header
*hdr
= NULL
;
1059 struct ctdb_req_message_old
*m
;
1060 struct ctdb_rec_data_old
*d
;
1062 ret
= ctdb_read_packet(conn
->fd
, conn
->timeout
, conn
, &hdr
);
1064 DEBUG(0, ("ctdb_read_packet failed: %s\n",
1066 cluster_fatal("ctdbd died\n");
1069 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
1070 DEBUG(0, ("Got operation %u, expected a message\n",
1071 (unsigned)hdr
->operation
));
1075 m
= (struct ctdb_req_message_old
*)hdr
;
1076 d
= (struct ctdb_rec_data_old
*)&m
->data
[0];
1077 if (m
->datalen
< sizeof(uint32_t) || m
->datalen
!= d
->length
) {
1078 DEBUG(0, ("Got invalid traverse data of length %d\n",
1083 key
.dsize
= d
->keylen
;
1084 key
.dptr
= &d
->data
[0];
1085 data
.dsize
= d
->datalen
;
1086 data
.dptr
= &d
->data
[d
->keylen
];
1088 if (key
.dsize
== 0 && data
.dsize
== 0) {
1089 /* end of traverse */
1093 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1094 DEBUG(0, ("Got invalid ltdb header length %d\n",
1098 data
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1099 data
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1102 fn(key
, data
, private_data
);
1109 This is used to canonicalize a ctdb_sock_addr structure.
1111 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage
*in
,
1112 struct sockaddr_storage
*out
)
1114 memcpy(out
, in
, sizeof (*out
));
1117 if (in
->ss_family
== AF_INET6
) {
1118 const char prefix
[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1119 const struct sockaddr_in6
*in6
=
1120 (const struct sockaddr_in6
*)in
;
1121 struct sockaddr_in
*out4
= (struct sockaddr_in
*)out
;
1122 if (memcmp(&in6
->sin6_addr
, prefix
, 12) == 0) {
1123 memset(out
, 0, sizeof(*out
));
1124 #ifdef HAVE_SOCK_SIN_LEN
1125 out4
->sin_len
= sizeof(*out
);
1127 out4
->sin_family
= AF_INET
;
1128 out4
->sin_port
= in6
->sin6_port
;
1129 memcpy(&out4
->sin_addr
, &in6
->sin6_addr
.s6_addr
[12], 4);
1136 * Register us as a server for a particular tcp connection
1139 int ctdbd_register_ips(struct ctdbd_connection
*conn
,
1140 const struct sockaddr_storage
*_server
,
1141 const struct sockaddr_storage
*_client
,
1142 int (*cb
)(uint32_t src_vnn
, uint32_t dst_vnn
,
1144 const uint8_t *msg
, size_t msglen
,
1145 void *private_data
),
1148 struct ctdb_connection p
;
1149 TDB_DATA data
= { .dptr
= (uint8_t *)&p
, .dsize
= sizeof(p
) };
1151 struct sockaddr_storage client
;
1152 struct sockaddr_storage server
;
1155 * Only one connection so far
1158 smbd_ctdb_canonicalize_ip(_client
, &client
);
1159 smbd_ctdb_canonicalize_ip(_server
, &server
);
1161 switch (client
.ss_family
) {
1163 memcpy(&p
.dst
.ip
, &server
, sizeof(p
.dst
.ip
));
1164 memcpy(&p
.src
.ip
, &client
, sizeof(p
.src
.ip
));
1167 memcpy(&p
.dst
.ip6
, &server
, sizeof(p
.dst
.ip6
));
1168 memcpy(&p
.src
.ip6
, &client
, sizeof(p
.src
.ip6
));
1175 * We want to be told about IP releases
1178 ret
= register_with_ctdbd(conn
, CTDB_SRVID_RELEASE_IP
,
1185 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1186 * can send an extra ack to trigger a reset for our client, so it
1187 * immediately reconnects
1189 ret
= ctdbd_control(conn
, CTDB_CURRENT_NODE
,
1190 CTDB_CONTROL_TCP_CLIENT
, 0,
1191 CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
,
1200 call a control on the local node
1202 int ctdbd_control_local(struct ctdbd_connection
*conn
, uint32_t opcode
,
1203 uint64_t srvid
, uint32_t flags
, TDB_DATA data
,
1204 TALLOC_CTX
*mem_ctx
, TDB_DATA
*outdata
,
1207 return ctdbd_control(conn
, CTDB_CURRENT_NODE
, opcode
, srvid
, flags
, data
,
1208 mem_ctx
, outdata
, cstatus
);
1211 int ctdb_watch_us(struct ctdbd_connection
*conn
)
1213 struct ctdb_notify_data_old reg_data
;
1218 reg_data
.srvid
= CTDB_SRVID_SAMBA_NOTIFY
;
1220 reg_data
.notify_data
[0] = 0;
1222 struct_len
= offsetof(struct ctdb_notify_data_old
,
1223 notify_data
) + reg_data
.len
;
1225 ret
= ctdbd_control_local(
1226 conn
, CTDB_CONTROL_REGISTER_NOTIFY
, conn
->rand_srvid
, 0,
1227 make_tdb_data((uint8_t *)®_data
, struct_len
),
1228 NULL
, NULL
, &cstatus
);
1230 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1236 int ctdb_unwatch(struct ctdbd_connection
*conn
)
1238 uint64_t srvid
= CTDB_SRVID_SAMBA_NOTIFY
;
1242 ret
= ctdbd_control_local(
1243 conn
, CTDB_CONTROL_DEREGISTER_NOTIFY
, conn
->rand_srvid
, 0,
1244 make_tdb_data((uint8_t *)&srvid
, sizeof(srvid
)),
1245 NULL
, NULL
, &cstatus
);
1247 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1253 int ctdbd_probe(const char *sockname
, int timeout
)
1256 * Do a very early check if ctdbd is around to avoid an abort and core
1259 struct ctdbd_connection
*conn
= NULL
;
1262 ret
= ctdbd_init_connection(talloc_tos(), sockname
, timeout
,
1266 * We only care if we can connect.
1273 struct ctdb_pkt_send_state
{
1274 struct ctdb_pkt_send_state
*prev
, *next
;
1275 struct tevent_context
*ev
;
1276 struct ctdbd_connection
*conn
;
1278 /* ctdb request id */
1281 /* the associated tevent request */
1282 struct tevent_req
*req
;
1284 /* iovec array with data to send */
1289 /* Initial packet length */
1293 static void ctdb_pkt_send_cleanup(struct tevent_req
*req
,
1294 enum tevent_req_state req_state
);
1297 * Asynchronously send a ctdb packet given as iovec array
1299 * Note: the passed iov array is not const here. Similar
1300 * functions in samba take a const array and create a copy
1301 * before calling iov_advance() on the array.
1303 * This function will modify the iov array! But
1304 * this is a static function and our only caller
1305 * ctdb_parse_send/recv is preparared for this to
1308 static struct tevent_req
*ctdb_pkt_send_send(TALLOC_CTX
*mem_ctx
,
1309 struct tevent_context
*ev
,
1310 struct ctdbd_connection
*conn
,
1314 enum dbwrap_req_state
*req_state
)
1316 struct tevent_req
*req
= NULL
;
1317 struct ctdb_pkt_send_state
*state
= NULL
;
1321 DBG_DEBUG("sending async ctdb reqid [%" PRIu32
"]\n", reqid
);
1323 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_pkt_send_state
);
1328 *state
= (struct ctdb_pkt_send_state
) {
1335 .packet_len
= iov_buflen(iov
, iovcnt
),
1338 tevent_req_set_cleanup_fn(req
, ctdb_pkt_send_cleanup
);
1340 *req_state
= DBWRAP_REQ_QUEUED
;
1342 if (ctdbd_conn_has_async_sends(conn
)) {
1344 * Can't attempt direct write with messages already queued and
1345 * possibly in progress
1347 DLIST_ADD_END(conn
->send_list
, state
);
1352 * Attempt a direct write. If this returns short, shedule the
1353 * remaining data as an async write, otherwise we're already done.
1356 nwritten
= writev(conn
->fd
, state
->iov
, state
->iovcnt
);
1357 if (nwritten
== state
->packet_len
) {
1358 DBG_DEBUG("Finished sending reqid [%" PRIu32
"]\n", reqid
);
1360 *req_state
= DBWRAP_REQ_DISPATCHED
;
1361 tevent_req_done(req
);
1362 return tevent_req_post(req
, ev
);
1365 if (nwritten
== -1) {
1366 if (errno
!= EINTR
&& errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
1367 cluster_fatal("cluster write error\n");
1372 DBG_DEBUG("Posting async write of reqid [%" PRIu32
"]"
1373 "after short write [%zd]\n", reqid
, nwritten
);
1375 ok
= iov_advance(&state
->iov
, &state
->iovcnt
, nwritten
);
1377 *req_state
= DBWRAP_REQ_ERROR
;
1378 tevent_req_error(req
, EIO
);
1379 return tevent_req_post(req
, ev
);
1383 * As this is the first async write req we post, we must enable
1384 * fd-writable events.
1386 TEVENT_FD_WRITEABLE(conn
->fde
);
1387 DLIST_ADD_END(conn
->send_list
, state
);
1391 static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state
*state
)
1393 struct ctdbd_connection
*conn
= state
->conn
;
1399 if (state
->req
== NULL
) {
1400 DBG_DEBUG("Removing cancelled reqid [%" PRIu32
"]\n",
1403 DLIST_REMOVE(conn
->send_list
, state
);
1407 DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32
"]\n",
1410 talloc_reparent(state
->req
, conn
, state
);
1415 static void ctdb_pkt_send_cleanup(struct tevent_req
*req
,
1416 enum tevent_req_state req_state
)
1418 struct ctdb_pkt_send_state
*state
= tevent_req_data(
1419 req
, struct ctdb_pkt_send_state
);
1420 struct ctdbd_connection
*conn
= state
->conn
;
1421 size_t missing_len
= 0;
1427 missing_len
= iov_buflen(state
->iov
, state
->iovcnt
);
1428 if (state
->packet_len
== missing_len
) {
1430 * We haven't yet started sending this one, so we can just
1431 * remove it from the pending list
1435 if (missing_len
!= 0) {
1436 uint8_t *buf
= NULL
;
1438 if (req_state
!= TEVENT_REQ_RECEIVED
) {
1440 * Wait til the req_state is TEVENT_REQ_RECEIVED, as
1441 * that will be the final state when the request state
1442 * is talloc_free'd from tallloc_req_received(). Which
1443 * ensures we only run the following code *ONCE*!
1448 DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32
"]\n",
1451 * A request in progress of being sent. Reparent the iov buffer
1452 * so we can continue sending the request. See also the comment
1453 * in ctdbd_parse_send() when copying the key buffer.
1456 buf
= iov_concat(state
, state
->iov
, state
->iovcnt
);
1458 cluster_fatal("iov_concat error\n");
1463 state
->_iov
.iov_base
= buf
;
1464 state
->_iov
.iov_len
= missing_len
;
1465 state
->iov
= &state
->_iov
;
1467 talloc_set_destructor(state
, ctdb_pkt_send_state_destructor
);
1471 DBG_DEBUG("Removing pending reqid [%" PRIu32
"]\n", state
->reqid
);
1474 DLIST_REMOVE(conn
->send_list
, state
);
1476 if (!ctdbd_conn_has_async_sends(conn
)) {
1477 DBG_DEBUG("No more sends, disabling fd-writable events\n");
1478 TEVENT_FD_NOT_WRITEABLE(conn
->fde
);
1482 static int ctdb_pkt_send_handler(struct ctdbd_connection
*conn
)
1484 struct ctdb_pkt_send_state
*state
= NULL
;
1489 DBG_DEBUG("send handler\n");
1491 if (!ctdbd_conn_has_async_sends(conn
)) {
1492 DBG_WARNING("Writable fd-event without pending send\n");
1493 TEVENT_FD_NOT_WRITEABLE(conn
->fde
);
1497 state
= conn
->send_list
;
1498 iovlen
= iov_buflen(state
->iov
, state
->iovcnt
);
1500 nwritten
= writev(conn
->fd
, state
->iov
, state
->iovcnt
);
1501 if (nwritten
== -1) {
1502 if (errno
!= EINTR
&& errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
1503 DBG_ERR("writev failed: %s\n", strerror(errno
));
1504 cluster_fatal("cluster write error\n");
1506 DBG_DEBUG("recoverable writev error, retry\n");
1510 if (nwritten
< iovlen
) {
1511 DBG_DEBUG("short write\n");
1513 ok
= iov_advance(&state
->iov
, &state
->iovcnt
, nwritten
);
1515 DBG_ERR("iov_advance failed\n");
1516 if (state
->req
== NULL
) {
1520 tevent_req_error(state
->req
, EIO
);
1526 if (state
->req
== NULL
) {
1527 DBG_DEBUG("Finished sending cancelled reqid [%" PRIu32
"]\n",
1533 DBG_DEBUG("Finished send request id [%" PRIu32
"]\n", state
->reqid
);
1535 tevent_req_done(state
->req
);
1539 static int ctdb_pkt_send_recv(struct tevent_req
*req
)
1543 if (tevent_req_is_unix_error(req
, &ret
)) {
1544 tevent_req_received(req
);
1548 tevent_req_received(req
);
1552 struct ctdb_pkt_recv_state
{
1553 struct ctdb_pkt_recv_state
*prev
, *next
;
1554 struct tevent_context
*ev
;
1555 struct ctdbd_connection
*conn
;
1557 /* ctdb request id */
1560 /* the associated tevent_req */
1561 struct tevent_req
*req
;
1563 /* pointer to allocated ctdb packet buffer */
1564 struct ctdb_req_header
*hdr
;
1567 static void ctdb_pkt_recv_cleanup(struct tevent_req
*req
,
1568 enum tevent_req_state req_state
);
1570 static struct tevent_req
*ctdb_pkt_recv_send(TALLOC_CTX
*mem_ctx
,
1571 struct tevent_context
*ev
,
1572 struct ctdbd_connection
*conn
,
1575 struct tevent_req
*req
= NULL
;
1576 struct ctdb_pkt_recv_state
*state
= NULL
;
1578 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_pkt_recv_state
);
1583 *state
= (struct ctdb_pkt_recv_state
) {
1590 tevent_req_set_cleanup_fn(req
, ctdb_pkt_recv_cleanup
);
1593 * fd-readable event is always set for the fde, no need to deal with
1597 DLIST_ADD_END(conn
->recv_list
, state
);
1598 DBG_DEBUG("Posted receive reqid [%" PRIu32
"]\n", state
->reqid
);
1603 static void ctdb_pkt_recv_cleanup(struct tevent_req
*req
,
1604 enum tevent_req_state req_state
)
1606 struct ctdb_pkt_recv_state
*state
= tevent_req_data(
1607 req
, struct ctdb_pkt_recv_state
);
1608 struct ctdbd_connection
*conn
= state
->conn
;
1614 DLIST_REMOVE(conn
->recv_list
, state
);
1617 static int ctdb_pkt_recv_handler(struct ctdbd_connection
*conn
)
1619 struct ctdb_pkt_recv_state
*state
= NULL
;
1624 DBG_DEBUG("receive handler\n");
1626 if (conn
->read_state
.iovs
== NULL
) {
1627 conn
->read_state
.iov
.iov_base
= &conn
->read_state
.msglen
;
1628 conn
->read_state
.iov
.iov_len
= sizeof(conn
->read_state
.msglen
);
1629 conn
->read_state
.iovs
= &conn
->read_state
.iov
;
1630 conn
->read_state
.iovcnt
= 1;
1633 iovlen
= iov_buflen(conn
->read_state
.iovs
, conn
->read_state
.iovcnt
);
1635 DBG_DEBUG("iovlen [%zd]\n", iovlen
);
1637 nread
= readv(conn
->fd
, conn
->read_state
.iovs
, conn
->read_state
.iovcnt
);
1639 cluster_fatal("cluster read error, peer closed connection\n");
1642 if (errno
!= EINTR
&& errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
1643 cluster_fatal("cluster read error\n");
1645 DBG_DEBUG("recoverable error from readv, retry\n");
1649 if (nread
< iovlen
) {
1650 DBG_DEBUG("iovlen [%zd] nread [%zd]\n", iovlen
, nread
);
1651 ok
= iov_advance(&conn
->read_state
.iovs
,
1652 &conn
->read_state
.iovcnt
,
1660 conn
->read_state
.iovs
= NULL
;
1661 conn
->read_state
.iovcnt
= 0;
1663 if (conn
->read_state
.hdr
== NULL
) {
1665 * Going this way after reading the 4 initial byte message
1668 uint32_t msglen
= conn
->read_state
.msglen
;
1669 uint8_t *readbuf
= NULL
;
1672 DBG_DEBUG("msglen: %" PRIu32
"\n", msglen
);
1674 if (msglen
< sizeof(struct ctdb_req_header
)) {
1675 DBG_ERR("short message %" PRIu32
"\n", msglen
);
1679 conn
->read_state
.hdr
= talloc_size(conn
, msglen
);
1680 if (conn
->read_state
.hdr
== NULL
) {
1683 conn
->read_state
.hdr
->length
= msglen
;
1684 talloc_set_name_const(conn
->read_state
.hdr
,
1685 "struct ctdb_req_header");
1687 readbuf
= (uint8_t *)conn
->read_state
.hdr
+ sizeof(msglen
);
1688 readlen
= msglen
- sizeof(msglen
);
1690 conn
->read_state
.iov
.iov_base
= readbuf
;
1691 conn
->read_state
.iov
.iov_len
= readlen
;
1692 conn
->read_state
.iovs
= &conn
->read_state
.iov
;
1693 conn
->read_state
.iovcnt
= 1;
1695 DBG_DEBUG("Scheduled packet read size %zd\n", readlen
);
1700 * Searching a list here is expected to be cheap, as messages are
1701 * exepcted to be coming in more or less ordered and we should find the
1702 * waiting request near the beginning of the list.
1704 for (state
= conn
->recv_list
; state
!= NULL
; state
= state
->next
) {
1705 if (state
->reqid
== conn
->read_state
.hdr
->reqid
) {
1710 if (state
== NULL
) {
1711 DBG_ERR("Discarding async ctdb reqid %u\n",
1712 conn
->read_state
.hdr
->reqid
);
1713 TALLOC_FREE(conn
->read_state
.hdr
);
1714 ZERO_STRUCT(conn
->read_state
);
1718 DBG_DEBUG("Got reply for reqid [%" PRIu32
"]\n", state
->reqid
);
1720 state
->hdr
= talloc_move(state
, &conn
->read_state
.hdr
);
1721 ZERO_STRUCT(conn
->read_state
);
1722 tevent_req_done(state
->req
);
1726 static int ctdb_pkt_recv_recv(struct tevent_req
*req
,
1727 TALLOC_CTX
*mem_ctx
,
1728 struct ctdb_req_header
**_hdr
)
1730 struct ctdb_pkt_recv_state
*state
= tevent_req_data(
1731 req
, struct ctdb_pkt_recv_state
);
1734 if (tevent_req_is_unix_error(req
, &error
)) {
1735 DBG_ERR("ctdb_read_req failed %s\n", strerror(error
));
1736 tevent_req_received(req
);
1740 *_hdr
= talloc_move(mem_ctx
, &state
->hdr
);
1742 tevent_req_received(req
);
1746 static int ctdbd_connection_destructor(struct ctdbd_connection
*c
)
1748 TALLOC_FREE(c
->fde
);
1754 TALLOC_FREE(c
->read_state
.hdr
);
1755 ZERO_STRUCT(c
->read_state
);
1757 while (c
->send_list
!= NULL
) {
1758 struct ctdb_pkt_send_state
*send_state
= c
->send_list
;
1759 DLIST_REMOVE(c
->send_list
, send_state
);
1760 send_state
->conn
= NULL
;
1761 tevent_req_defer_callback(send_state
->req
, send_state
->ev
);
1762 tevent_req_error(send_state
->req
, EIO
);
1765 while (c
->recv_list
!= NULL
) {
1766 struct ctdb_pkt_recv_state
*recv_state
= c
->recv_list
;
1767 DLIST_REMOVE(c
->recv_list
, recv_state
);
1768 recv_state
->conn
= NULL
;
1769 tevent_req_defer_callback(recv_state
->req
, recv_state
->ev
);
1770 tevent_req_error(recv_state
->req
, EIO
);
1776 struct ctdbd_parse_state
{
1777 struct tevent_context
*ev
;
1778 struct ctdbd_connection
*conn
;
1781 uint8_t _keybuf
[64];
1782 struct ctdb_req_call_old ctdb_req
;
1783 struct iovec iov
[2];
1784 void (*parser
)(TDB_DATA key
,
1786 void *private_data
);
1788 enum dbwrap_req_state
*req_state
;
1791 static void ctdbd_parse_pkt_send_done(struct tevent_req
*subreq
);
1792 static void ctdbd_parse_done(struct tevent_req
*subreq
);
1794 struct tevent_req
*ctdbd_parse_send(TALLOC_CTX
*mem_ctx
,
1795 struct tevent_context
*ev
,
1796 struct ctdbd_connection
*conn
,
1800 void (*parser
)(TDB_DATA key
,
1802 void *private_data
),
1804 enum dbwrap_req_state
*req_state
)
1806 struct tevent_req
*req
= NULL
;
1807 struct ctdbd_parse_state
*state
= NULL
;
1809 uint32_t packet_length
;
1810 struct tevent_req
*subreq
= NULL
;
1812 req
= tevent_req_create(mem_ctx
, &state
, struct ctdbd_parse_state
);
1814 *req_state
= DBWRAP_REQ_ERROR
;
1818 *state
= (struct ctdbd_parse_state
) {
1821 .reqid
= ctdbd_next_reqid(conn
),
1823 .private_data
= private_data
,
1824 .req_state
= req_state
,
1827 flags
= local_copy
? CTDB_WANT_READONLY
: 0;
1828 packet_length
= offsetof(struct ctdb_req_call_old
, data
) + key
.dsize
;
1831 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
1832 * all passed iov elements have a lifetime longer that the tevent_req
1833 * returned by ctdb_pkt_send_send(). This is required continue sending a
1834 * the low level request into the ctdb socket, if a higher level
1835 * ('this') request is canceled (or talloc free'd) by the application
1836 * layer, without sending invalid packets to ctdb.
1838 if (key
.dsize
> sizeof(state
->_keybuf
)) {
1839 state
->key
.dptr
= talloc_memdup(state
, key
.dptr
, key
.dsize
);
1840 if (tevent_req_nomem(state
->key
.dptr
, req
)) {
1841 return tevent_req_post(req
, ev
);
1844 memcpy(state
->_keybuf
, key
.dptr
, key
.dsize
);
1845 state
->key
.dptr
= state
->_keybuf
;
1847 state
->key
.dsize
= key
.dsize
;
1849 state
->ctdb_req
.hdr
.length
= packet_length
;
1850 state
->ctdb_req
.hdr
.ctdb_magic
= CTDB_MAGIC
;
1851 state
->ctdb_req
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
1852 state
->ctdb_req
.hdr
.operation
= CTDB_REQ_CALL
;
1853 state
->ctdb_req
.hdr
.reqid
= state
->reqid
;
1854 state
->ctdb_req
.flags
= flags
;
1855 state
->ctdb_req
.callid
= CTDB_FETCH_FUNC
;
1856 state
->ctdb_req
.db_id
= db_id
;
1857 state
->ctdb_req
.keylen
= state
->key
.dsize
;
1859 state
->iov
[0].iov_base
= &state
->ctdb_req
;
1860 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_call_old
, data
);
1861 state
->iov
[1].iov_base
= state
->key
.dptr
;
1862 state
->iov
[1].iov_len
= state
->key
.dsize
;
1865 * Note that ctdb_pkt_send_send()
1866 * will modify state->iov using
1867 * iov_advance() without making a copy.
1869 subreq
= ctdb_pkt_send_send(state
,
1874 ARRAY_SIZE(state
->iov
),
1876 if (tevent_req_nomem(subreq
, req
)) {
1877 *req_state
= DBWRAP_REQ_ERROR
;
1878 return tevent_req_post(req
, ev
);
1880 tevent_req_set_callback(subreq
, ctdbd_parse_pkt_send_done
, req
);
1885 static void ctdbd_parse_pkt_send_done(struct tevent_req
*subreq
)
1887 struct tevent_req
*req
= tevent_req_callback_data(
1888 subreq
, struct tevent_req
);
1889 struct ctdbd_parse_state
*state
= tevent_req_data(
1890 req
, struct ctdbd_parse_state
);
1893 ret
= ctdb_pkt_send_recv(subreq
);
1894 TALLOC_FREE(subreq
);
1895 if (tevent_req_error(req
, ret
)) {
1896 DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret
));
1900 subreq
= ctdb_pkt_recv_send(state
,
1904 if (tevent_req_nomem(subreq
, req
)) {
1908 *state
->req_state
= DBWRAP_REQ_DISPATCHED
;
1909 tevent_req_set_callback(subreq
, ctdbd_parse_done
, req
);
1913 static void ctdbd_parse_done(struct tevent_req
*subreq
)
1915 struct tevent_req
*req
= tevent_req_callback_data(
1916 subreq
, struct tevent_req
);
1917 struct ctdbd_parse_state
*state
= tevent_req_data(
1918 req
, struct ctdbd_parse_state
);
1919 struct ctdb_req_header
*hdr
= NULL
;
1920 struct ctdb_reply_call_old
*reply
= NULL
;
1923 DBG_DEBUG("async parse request finished\n");
1925 ret
= ctdb_pkt_recv_recv(subreq
, state
, &hdr
);
1926 TALLOC_FREE(subreq
);
1927 if (tevent_req_error(req
, ret
)) {
1928 DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret
));
1932 if (hdr
->operation
!= CTDB_REPLY_CALL
) {
1933 DBG_ERR("received invalid reply\n");
1934 ctdb_packet_dump(hdr
);
1935 tevent_req_error(req
, EIO
);
1939 reply
= (struct ctdb_reply_call_old
*)hdr
;
1941 if (reply
->datalen
== 0) {
1943 * Treat an empty record as non-existing
1945 tevent_req_error(req
, ENOENT
);
1949 state
->parser(state
->key
,
1950 make_tdb_data(&reply
->data
[0], reply
->datalen
),
1951 state
->private_data
);
1953 tevent_req_done(req
);
1957 int ctdbd_parse_recv(struct tevent_req
*req
)
1961 if (tevent_req_is_unix_error(req
, &error
)) {
1962 DBG_DEBUG("async parse returned %s\n", strerror(error
));
1963 tevent_req_received(req
);
1967 tevent_req_received(req
);