2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
25 #include "ctdbd_conn.h"
26 #include "system/select.h"
27 #include "lib/util/sys_rw_data.h"
28 #include "lib/util/iov_buf.h"
29 #include "lib/util/select.h"
30 #include "lib/util/debug.h"
31 #include "lib/util/talloc_stack.h"
32 #include "lib/util/genrand.h"
33 #include "lib/util/fault.h"
34 #include "lib/util/dlinklist.h"
35 #include "lib/util/tevent_unix.h"
36 #include "lib/util/sys_rw.h"
37 #include "lib/util/blocking.h"
38 #include "ctdb/include/ctdb_protocol.h"
40 /* paths to these include files come from --with-ctdb= in configure */
42 struct ctdbd_srvid_cb
{
44 int (*cb
)(struct tevent_context
*ev
,
45 uint32_t src_vnn
, uint32_t dst_vnn
,
47 const uint8_t *msg
, size_t msglen
,
52 struct ctdb_pkt_send_state
;
53 struct ctdb_pkt_recv_state
;
55 struct ctdbd_connection
{
59 struct ctdbd_srvid_cb
*callbacks
;
63 /* For async connections, enabled via ctdbd_setup_fde() */
64 struct tevent_fd
*fde
;
66 /* State to track in-progress read */
67 struct ctdb_read_state
{
68 /* Receive buffer for the initial packet length */
71 /* iovec state for current read */
76 /* allocated receive buffer based on packet length */
77 struct ctdb_req_header
*hdr
;
80 /* Lists of pending async reads and writes */
81 struct ctdb_pkt_recv_state
*recv_list
;
82 struct ctdb_pkt_send_state
*send_list
;
85 static void ctdbd_async_socket_handler(struct tevent_context
*ev
,
86 struct tevent_fd
*fde
,
90 static bool ctdbd_conn_has_async_sends(struct ctdbd_connection
*conn
)
92 return (conn
->send_list
!= NULL
);
95 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection
*conn
)
97 return (conn
->fde
!= NULL
);
100 static uint32_t ctdbd_next_reqid(struct ctdbd_connection
*conn
)
103 if (conn
->reqid
== 0) {
109 static int ctdbd_control(struct ctdbd_connection
*conn
,
110 uint32_t vnn
, uint32_t opcode
,
111 uint64_t srvid
, uint32_t flags
,
113 TALLOC_CTX
*mem_ctx
, TDB_DATA
*outdata
,
117 * exit on fatal communications errors with the ctdbd daemon
119 static void cluster_fatal(const char *why
)
121 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why
));
122 /* we don't use smb_panic() as we don't want to delay to write
123 a core file. We need to release this process id immediately
124 so that someone else can take over without getting sharing
132 static void ctdb_packet_dump(struct ctdb_req_header
*hdr
)
134 if (DEBUGLEVEL
< 11) {
137 DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
138 (int)hdr
->length
, (int)hdr
->ctdb_magic
,
139 (int)hdr
->ctdb_version
, (int)hdr
->generation
,
140 (int)hdr
->operation
, (int)hdr
->reqid
));
144 * Register a srvid with ctdbd
146 int register_with_ctdbd(struct ctdbd_connection
*conn
, uint64_t srvid
,
147 int (*cb
)(struct tevent_context
*ev
,
148 uint32_t src_vnn
, uint32_t dst_vnn
,
150 const uint8_t *msg
, size_t msglen
,
157 size_t num_callbacks
;
158 struct ctdbd_srvid_cb
*tmp
;
160 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_REGISTER_SRVID
, srvid
, 0,
161 tdb_null
, NULL
, NULL
, &cstatus
);
166 num_callbacks
= talloc_array_length(conn
->callbacks
);
168 tmp
= talloc_realloc(conn
, conn
->callbacks
, struct ctdbd_srvid_cb
,
173 conn
->callbacks
= tmp
;
175 conn
->callbacks
[num_callbacks
] = (struct ctdbd_srvid_cb
) {
176 .srvid
= srvid
, .cb
= cb
, .private_data
= private_data
182 static int ctdbd_msg_call_back(struct tevent_context
*ev
,
183 struct ctdbd_connection
*conn
,
184 struct ctdb_req_message_old
*msg
)
187 size_t i
, num_callbacks
;
189 msg_len
= msg
->hdr
.length
;
190 if (msg_len
< offsetof(struct ctdb_req_message_old
, data
)) {
191 DBG_DEBUG("len %"PRIu32
" too small\n", msg_len
);
194 msg_len
-= offsetof(struct ctdb_req_message_old
, data
);
196 if (msg_len
< msg
->datalen
) {
197 DBG_DEBUG("msg_len=%"PRIu32
" < msg->datalen=%"PRIu32
"\n",
198 msg_len
, msg
->datalen
);
202 num_callbacks
= talloc_array_length(conn
->callbacks
);
204 for (i
=0; i
<num_callbacks
; i
++) {
205 struct ctdbd_srvid_cb
*cb
= &conn
->callbacks
[i
];
207 if ((cb
->srvid
== msg
->srvid
) && (cb
->cb
!= NULL
)) {
211 msg
->hdr
.srcnode
, msg
->hdr
.destnode
,
212 msg
->srvid
, msg
->data
, msg
->datalen
,
223 * get our vnn from the cluster
225 static int get_cluster_vnn(struct ctdbd_connection
*conn
, uint32_t *vnn
)
229 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_GET_PNN
, 0, 0,
230 tdb_null
, NULL
, NULL
, &cstatus
);
232 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret
)));
235 *vnn
= (uint32_t)cstatus
;
240 * Are we active (i.e. not banned or stopped?)
242 static bool ctdbd_working(struct ctdbd_connection
*conn
, uint32_t vnn
)
246 struct ctdb_node_map_old
*m
;
251 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_GET_NODEMAP
, 0, 0,
252 tdb_null
, talloc_tos(), &outdata
, &cstatus
);
254 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret
)));
257 if ((cstatus
!= 0) || (outdata
.dptr
== NULL
)) {
258 DEBUG(2, ("Received invalid ctdb data\n"));
262 m
= (struct ctdb_node_map_old
*)outdata
.dptr
;
264 for (i
=0; i
<m
->num
; i
++) {
265 if (vnn
== m
->nodes
[i
].pnn
) {
271 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
276 if ((m
->nodes
[i
].flags
& NODE_FLAGS_INACTIVE
) != 0) {
277 DEBUG(2, ("Node has status %x, not active\n",
278 (int)m
->nodes
[i
].flags
));
284 TALLOC_FREE(outdata
.dptr
);
288 uint32_t ctdbd_vnn(const struct ctdbd_connection
*conn
)
290 return conn
->our_vnn
;
294 * Get us a ctdb connection
297 static int ctdbd_connect(const char *sockname
, int *pfd
)
299 struct sockaddr_un addr
= { 0, };
304 fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
307 DEBUG(3, ("Could not create socket: %s\n", strerror(err
)));
311 addr
.sun_family
= AF_UNIX
;
313 namelen
= strlcpy(addr
.sun_path
, sockname
, sizeof(addr
.sun_path
));
314 if (namelen
>= sizeof(addr
.sun_path
)) {
315 DEBUG(3, ("%s: Socket name too long: %s\n", __func__
,
321 salen
= sizeof(struct sockaddr_un
);
323 if (connect(fd
, (struct sockaddr
*)(void *)&addr
, salen
) == -1) {
325 DEBUG(1, ("connect(%s) failed: %s\n", sockname
,
335 static int ctdb_read_packet(int fd
, int timeout
, TALLOC_CTX
*mem_ctx
,
336 struct ctdb_req_header
**result
)
338 struct ctdb_req_header
*req
;
343 struct pollfd pfd
= { .fd
= fd
, .events
= POLLIN
};
346 ret
= sys_poll_intr(&pfd
, 1, timeout
);
358 nread
= read_data(fd
, &msglen
, sizeof(msglen
));
366 if (msglen
< sizeof(struct ctdb_req_header
)) {
370 req
= talloc_size(mem_ctx
, msglen
);
374 talloc_set_name_const(req
, "struct ctdb_req_header");
376 req
->length
= msglen
;
378 nread
= read_data(fd
, ((char *)req
) + sizeof(msglen
),
379 msglen
- sizeof(msglen
));
394 * Read a full ctdbd request. If we have a messaging context, defer incoming
395 * messages that might come in between.
398 static int ctdb_read_req(struct ctdbd_connection
*conn
, uint32_t reqid
,
399 TALLOC_CTX
*mem_ctx
, struct ctdb_req_header
**result
)
401 struct ctdb_req_header
*hdr
;
406 ret
= ctdb_read_packet(conn
->fd
, conn
->timeout
, mem_ctx
, &hdr
);
408 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret
));
409 cluster_fatal("failed to read data from ctdbd\n");
412 DEBUG(11, ("Received ctdb packet\n"));
413 ctdb_packet_dump(hdr
);
415 if (hdr
->operation
== CTDB_REQ_MESSAGE
) {
416 struct ctdb_req_message_old
*msg
= (struct ctdb_req_message_old
*)hdr
;
418 ret
= ctdbd_msg_call_back(NULL
, conn
, msg
);
428 if ((reqid
!= 0) && (hdr
->reqid
!= reqid
)) {
429 /* we got the wrong reply */
430 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
431 "been %u\n", hdr
->reqid
, reqid
));
436 *result
= talloc_move(mem_ctx
, &hdr
);
442 * This prepares conn for handling async requests
444 int ctdbd_setup_fde(struct ctdbd_connection
*conn
, struct tevent_context
*ev
)
448 ret
= set_blocking(conn
->fd
, false);
453 conn
->fde
= tevent_add_fd(ev
,
457 ctdbd_async_socket_handler
,
459 if (conn
->fde
== NULL
) {
466 static int ctdbd_connection_destructor(struct ctdbd_connection
*c
);
469 * Get us a ctdbd connection
472 static int ctdbd_init_connection_internal(TALLOC_CTX
*mem_ctx
,
473 const char *sockname
, int timeout
,
474 struct ctdbd_connection
*conn
)
478 conn
->timeout
= timeout
;
479 if (conn
->timeout
== 0) {
483 ret
= ctdbd_connect(sockname
, &conn
->fd
);
485 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret
)));
488 talloc_set_destructor(conn
, ctdbd_connection_destructor
);
490 ret
= get_cluster_vnn(conn
, &conn
->our_vnn
);
492 DEBUG(10, ("get_cluster_vnn failed: %s\n", strerror(ret
)));
496 if (!ctdbd_working(conn
, conn
->our_vnn
)) {
497 DEBUG(2, ("Node is not working, can not connect\n"));
501 generate_random_buffer((unsigned char *)&conn
->rand_srvid
,
502 sizeof(conn
->rand_srvid
));
504 ret
= register_with_ctdbd(conn
, conn
->rand_srvid
, NULL
, NULL
);
506 DEBUG(5, ("Could not register random srvid: %s\n",
514 int ctdbd_init_connection(TALLOC_CTX
*mem_ctx
,
515 const char *sockname
, int timeout
,
516 struct ctdbd_connection
**pconn
)
518 struct ctdbd_connection
*conn
;
521 if (!(conn
= talloc_zero(mem_ctx
, struct ctdbd_connection
))) {
522 DEBUG(0, ("talloc failed\n"));
526 ret
= ctdbd_init_connection_internal(mem_ctx
,
531 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
544 int ctdbd_reinit_connection(TALLOC_CTX
*mem_ctx
,
545 const char *sockname
, int timeout
,
546 struct ctdbd_connection
*conn
)
550 ret
= ctdbd_connection_destructor(conn
);
552 DBG_ERR("ctdbd_connection_destructor failed\n");
556 ret
= ctdbd_init_connection_internal(mem_ctx
,
561 DBG_ERR("ctdbd_init_connection_internal failed (%s)\n",
569 int ctdbd_conn_get_fd(struct ctdbd_connection
*conn
)
575 * Packet handler to receive and handle a ctdb message
577 static int ctdb_handle_message(struct tevent_context
*ev
,
578 struct ctdbd_connection
*conn
,
579 struct ctdb_req_header
*hdr
)
581 struct ctdb_req_message_old
*msg
;
583 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
584 DEBUG(0, ("Received async msg of type %u, discarding\n",
589 msg
= (struct ctdb_req_message_old
*)hdr
;
591 ctdbd_msg_call_back(ev
, conn
, msg
);
596 void ctdbd_socket_readable(struct tevent_context
*ev
,
597 struct ctdbd_connection
*conn
)
599 struct ctdb_req_header
*hdr
= NULL
;
602 ret
= ctdb_read_packet(conn
->fd
, conn
->timeout
, talloc_tos(), &hdr
);
604 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret
));
605 cluster_fatal("failed to read data from ctdbd\n");
608 ret
= ctdb_handle_message(ev
, conn
, hdr
);
613 DEBUG(10, ("could not handle incoming message: %s\n",
618 static int ctdb_pkt_send_handler(struct ctdbd_connection
*conn
);
619 static int ctdb_pkt_recv_handler(struct ctdbd_connection
*conn
);
621 /* Used for async connection and async ctcb requests */
622 static void ctdbd_async_socket_handler(struct tevent_context
*ev
,
623 struct tevent_fd
*fde
,
627 struct ctdbd_connection
*conn
= talloc_get_type_abort(
628 private_data
, struct ctdbd_connection
);
631 if ((flags
& TEVENT_FD_READ
) != 0) {
632 ret
= ctdb_pkt_recv_handler(conn
);
634 DBG_DEBUG("ctdb_read_iov_handler returned %s\n",
640 if ((flags
& TEVENT_FD_WRITE
) != 0) {
641 ret
= ctdb_pkt_send_handler(conn
);
643 DBG_DEBUG("ctdb_write_iov_handler returned %s\n",
653 int ctdbd_messaging_send_iov(struct ctdbd_connection
*conn
,
654 uint32_t dst_vnn
, uint64_t dst_srvid
,
655 const struct iovec
*iov
, int iovlen
)
657 struct ctdb_req_message_old r
;
658 struct iovec iov2
[iovlen
+1];
659 size_t buflen
= iov_buflen(iov
, iovlen
);
662 r
.hdr
.length
= offsetof(struct ctdb_req_message_old
, data
) + buflen
;
663 r
.hdr
.ctdb_magic
= CTDB_MAGIC
;
664 r
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
665 r
.hdr
.generation
= 1;
666 r
.hdr
.operation
= CTDB_REQ_MESSAGE
;
667 r
.hdr
.destnode
= dst_vnn
;
668 r
.hdr
.srcnode
= conn
->our_vnn
;
673 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
674 ctdb_packet_dump(&r
.hdr
);
676 iov2
[0].iov_base
= &r
;
677 iov2
[0].iov_len
= offsetof(struct ctdb_req_message_old
, data
);
678 memcpy(&iov2
[1], iov
, iovlen
* sizeof(struct iovec
));
680 nwritten
= write_data_iov(conn
->fd
, iov2
, iovlen
+1);
681 if (nwritten
== -1) {
682 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno
)));
683 cluster_fatal("cluster dispatch daemon msg write error\n");
690 * send/recv a generic ctdb control message
692 static int ctdbd_control(struct ctdbd_connection
*conn
,
693 uint32_t vnn
, uint32_t opcode
,
694 uint64_t srvid
, uint32_t flags
,
696 TALLOC_CTX
*mem_ctx
, TDB_DATA
*outdata
,
699 struct ctdb_req_control_old req
;
700 struct ctdb_req_header
*hdr
;
701 struct ctdb_reply_control_old
*reply
= NULL
;
706 if (ctdbd_conn_has_async_reqs(conn
)) {
708 * Can't use sync call while an async call is in flight. Adding
709 * this check as a safety net. We'll be using different
710 * connections for sync and async requests, so this shouldn't
711 * happen, but who knows...
713 DBG_ERR("Async ctdb req on sync connection\n");
718 req
.hdr
.length
= offsetof(struct ctdb_req_control_old
, data
) + data
.dsize
;
719 req
.hdr
.ctdb_magic
= CTDB_MAGIC
;
720 req
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
721 req
.hdr
.operation
= CTDB_REQ_CONTROL
;
722 req
.hdr
.reqid
= ctdbd_next_reqid(conn
);
723 req
.hdr
.destnode
= vnn
;
726 req
.datalen
= data
.dsize
;
729 DBG_DEBUG("Sending ctdb packet reqid=%"PRIu32
", vnn=%"PRIu32
", "
730 "opcode=%"PRIu32
", srvid=%"PRIu64
"\n", req
.hdr
.reqid
,
731 req
.hdr
.destnode
, req
.opcode
, req
.srvid
);
732 ctdb_packet_dump(&req
.hdr
);
734 iov
[0].iov_base
= &req
;
735 iov
[0].iov_len
= offsetof(struct ctdb_req_control_old
, data
);
736 iov
[1].iov_base
= data
.dptr
;
737 iov
[1].iov_len
= data
.dsize
;
739 nwritten
= write_data_iov(conn
->fd
, iov
, ARRAY_SIZE(iov
));
740 if (nwritten
== -1) {
741 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno
)));
742 cluster_fatal("cluster dispatch daemon msg write error\n");
745 if (flags
& CTDB_CTRL_FLAG_NOREPLY
) {
752 ret
= ctdb_read_req(conn
, req
.hdr
.reqid
, NULL
, &hdr
);
754 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret
)));
758 if (hdr
->operation
!= CTDB_REPLY_CONTROL
) {
759 DEBUG(0, ("received invalid reply\n"));
763 reply
= (struct ctdb_reply_control_old
*)hdr
;
766 if (!(outdata
->dptr
= (uint8_t *)talloc_memdup(
767 mem_ctx
, reply
->data
, reply
->datalen
))) {
771 outdata
->dsize
= reply
->datalen
;
774 (*cstatus
) = reply
->status
;
782 * see if a remote process exists
784 bool ctdbd_process_exists(struct ctdbd_connection
*conn
, uint32_t vnn
,
785 pid_t pid
, uint64_t unique_id
)
787 uint8_t buf
[sizeof(pid
)+sizeof(unique_id
)];
791 if (unique_id
== SERVERID_UNIQUE_ID_NOT_TO_VERIFY
) {
792 ret
= ctdbd_control(conn
, vnn
, CTDB_CONTROL_PROCESS_EXISTS
,
794 (TDB_DATA
) { .dptr
= (uint8_t *)&pid
,
795 .dsize
= sizeof(pid
) },
796 NULL
, NULL
, &cstatus
);
800 return (cstatus
== 0);
803 memcpy(buf
, &pid
, sizeof(pid
));
804 memcpy(buf
+sizeof(pid
), &unique_id
, sizeof(unique_id
));
806 ret
= ctdbd_control(conn
, vnn
, CTDB_CONTROL_CHECK_PID_SRVID
, 0, 0,
807 (TDB_DATA
) { .dptr
= buf
, .dsize
= sizeof(buf
) },
808 NULL
, NULL
, &cstatus
);
812 return (cstatus
== 0);
818 char *ctdbd_dbpath(struct ctdbd_connection
*conn
,
819 TALLOC_CTX
*mem_ctx
, uint32_t db_id
)
823 TDB_DATA rdata
= {0};
826 data
.dptr
= (uint8_t*)&db_id
;
827 data
.dsize
= sizeof(db_id
);
829 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_GETDBPATH
, 0, 0, data
,
830 mem_ctx
, &rdata
, &cstatus
);
831 if ((ret
!= 0) || cstatus
!= 0) {
832 DEBUG(0, (__location__
" ctdb_control for getdbpath failed: %s\n",
837 return (char *)rdata
.dptr
;
841 * attach to a ctdb database
843 int ctdbd_db_attach(struct ctdbd_connection
*conn
,
844 const char *name
, uint32_t *db_id
, bool persistent
)
850 data
= string_term_tdb_data(name
);
852 ret
= ctdbd_control_local(conn
,
854 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
855 : CTDB_CONTROL_DB_ATTACH
,
856 0, 0, data
, NULL
, &data
, &cstatus
);
858 DEBUG(0, (__location__
" ctdb_control for db_attach "
859 "failed: %s\n", strerror(ret
)));
863 if (cstatus
!= 0 || data
.dsize
!= sizeof(uint32_t)) {
864 DEBUG(0,(__location__
" ctdb_control for db_attach failed\n"));
868 *db_id
= *(uint32_t *)data
.dptr
;
869 talloc_free(data
.dptr
);
875 * force the migration of a record to this node
877 int ctdbd_migrate(struct ctdbd_connection
*conn
, uint32_t db_id
, TDB_DATA key
)
879 struct ctdb_req_call_old req
;
880 struct ctdb_req_header
*hdr
= NULL
;
885 if (ctdbd_conn_has_async_reqs(conn
)) {
887 * Can't use sync call while an async call is in flight. Adding
888 * this check as a safety net. We'll be using different
889 * connections for sync and async requests, so this shouldn't
890 * happen, but who knows...
892 DBG_ERR("Async ctdb req on sync connection\n");
898 req
.hdr
.length
= offsetof(struct ctdb_req_call_old
, data
) + key
.dsize
;
899 req
.hdr
.ctdb_magic
= CTDB_MAGIC
;
900 req
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
901 req
.hdr
.operation
= CTDB_REQ_CALL
;
902 req
.hdr
.reqid
= ctdbd_next_reqid(conn
);
903 req
.flags
= CTDB_IMMEDIATE_MIGRATION
;
904 req
.callid
= CTDB_NULL_FUNC
;
906 req
.keylen
= key
.dsize
;
908 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
909 ctdb_packet_dump(&req
.hdr
);
911 iov
[0].iov_base
= &req
;
912 iov
[0].iov_len
= offsetof(struct ctdb_req_call_old
, data
);
913 iov
[1].iov_base
= key
.dptr
;
914 iov
[1].iov_len
= key
.dsize
;
916 nwritten
= write_data_iov(conn
->fd
, iov
, ARRAY_SIZE(iov
));
917 if (nwritten
== -1) {
918 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno
)));
919 cluster_fatal("cluster dispatch daemon msg write error\n");
922 ret
= ctdb_read_req(conn
, req
.hdr
.reqid
, NULL
, &hdr
);
924 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret
)));
928 if (hdr
->operation
!= CTDB_REPLY_CALL
) {
929 if (hdr
->operation
== CTDB_REPLY_ERROR
) {
930 DBG_ERR("received error from ctdb\n");
932 DBG_ERR("received invalid reply\n");
945 * Fetch a record and parse it
947 int ctdbd_parse(struct ctdbd_connection
*conn
, uint32_t db_id
,
948 TDB_DATA key
, bool local_copy
,
949 void (*parser
)(TDB_DATA key
, TDB_DATA data
,
953 struct ctdb_req_call_old req
;
954 struct ctdb_req_header
*hdr
= NULL
;
955 struct ctdb_reply_call_old
*reply
;
961 if (ctdbd_conn_has_async_reqs(conn
)) {
963 * Can't use sync call while an async call is in flight. Adding
964 * this check as a safety net. We'll be using different
965 * connections for sync and async requests, so this shouldn't
966 * happen, but who knows...
968 DBG_ERR("Async ctdb req on sync connection\n");
972 flags
= local_copy
? CTDB_WANT_READONLY
: 0;
976 req
.hdr
.length
= offsetof(struct ctdb_req_call_old
, data
) + key
.dsize
;
977 req
.hdr
.ctdb_magic
= CTDB_MAGIC
;
978 req
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
979 req
.hdr
.operation
= CTDB_REQ_CALL
;
980 req
.hdr
.reqid
= ctdbd_next_reqid(conn
);
982 req
.callid
= CTDB_FETCH_FUNC
;
984 req
.keylen
= key
.dsize
;
986 iov
[0].iov_base
= &req
;
987 iov
[0].iov_len
= offsetof(struct ctdb_req_call_old
, data
);
988 iov
[1].iov_base
= key
.dptr
;
989 iov
[1].iov_len
= key
.dsize
;
991 nwritten
= write_data_iov(conn
->fd
, iov
, ARRAY_SIZE(iov
));
992 if (nwritten
== -1) {
993 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno
)));
994 cluster_fatal("cluster dispatch daemon msg write error\n");
997 ret
= ctdb_read_req(conn
, req
.hdr
.reqid
, NULL
, &hdr
);
999 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret
)));
1003 if ((hdr
== NULL
) || (hdr
->operation
!= CTDB_REPLY_CALL
)) {
1004 DEBUG(0, ("received invalid reply\n"));
1008 reply
= (struct ctdb_reply_call_old
*)hdr
;
1010 if (reply
->datalen
== 0) {
1012 * Treat an empty record as non-existing
1018 parser(key
, make_tdb_data(&reply
->data
[0], reply
->datalen
),
1028 Traverse a ctdb database. "conn" must be an otherwise unused
1029 ctdb_connection where no other messages but the traverse ones are
1033 int ctdbd_traverse(struct ctdbd_connection
*conn
, uint32_t db_id
,
1034 void (*fn
)(TDB_DATA key
, TDB_DATA data
,
1035 void *private_data
),
1040 struct ctdb_traverse_start t
;
1043 if (ctdbd_conn_has_async_reqs(conn
)) {
1045 * Can't use sync call while an async call is in flight. Adding
1046 * this check as a safety net. We'll be using different
1047 * connections for sync and async requests, so this shouldn't
1048 * happen, but who knows...
1050 DBG_ERR("Async ctdb req on sync connection\n");
1055 t
.srvid
= conn
->rand_srvid
;
1056 t
.reqid
= ctdbd_next_reqid(conn
);
1058 data
.dptr
= (uint8_t *)&t
;
1059 data
.dsize
= sizeof(t
);
1061 ret
= ctdbd_control_local(conn
, CTDB_CONTROL_TRAVERSE_START
,
1063 0, data
, NULL
, NULL
, &cstatus
);
1065 if ((ret
!= 0) || (cstatus
!= 0)) {
1066 DEBUG(0,("ctdbd_control failed: %s, %d\n", strerror(ret
),
1071 * We need a mapping here
1079 struct ctdb_req_header
*hdr
= NULL
;
1080 struct ctdb_req_message_old
*m
;
1081 struct ctdb_rec_data_old
*d
;
1083 ret
= ctdb_read_packet(conn
->fd
, conn
->timeout
, conn
, &hdr
);
1085 DBG_ERR("ctdb_read_packet failed: %s\n", strerror(ret
));
1086 cluster_fatal("failed to read data from ctdbd\n");
1089 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
1090 DEBUG(0, ("Got operation %u, expected a message\n",
1091 (unsigned)hdr
->operation
));
1095 m
= (struct ctdb_req_message_old
*)hdr
;
1096 d
= (struct ctdb_rec_data_old
*)&m
->data
[0];
1097 if (m
->datalen
< sizeof(uint32_t) || m
->datalen
!= d
->length
) {
1098 DEBUG(0, ("Got invalid traverse data of length %d\n",
1103 key
.dsize
= d
->keylen
;
1104 key
.dptr
= &d
->data
[0];
1105 data
.dsize
= d
->datalen
;
1106 data
.dptr
= &d
->data
[d
->keylen
];
1108 if (key
.dsize
== 0 && data
.dsize
== 0) {
1109 /* end of traverse */
1113 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1114 DEBUG(0, ("Got invalid ltdb header length %d\n",
1118 data
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1119 data
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1122 fn(key
, data
, private_data
);
1129 This is used to canonicalize a ctdb_sock_addr structure.
1131 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage
*in
,
1132 struct sockaddr_storage
*out
)
1134 memcpy(out
, in
, sizeof (*out
));
1137 if (in
->ss_family
== AF_INET6
) {
1138 const char prefix
[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1139 const struct sockaddr_in6
*in6
=
1140 (const struct sockaddr_in6
*)in
;
1141 struct sockaddr_in
*out4
= (struct sockaddr_in
*)out
;
1142 if (memcmp(&in6
->sin6_addr
, prefix
, 12) == 0) {
1143 memset(out
, 0, sizeof(*out
));
1144 #ifdef HAVE_SOCK_SIN_LEN
1145 out4
->sin_len
= sizeof(*out
);
1147 out4
->sin_family
= AF_INET
;
1148 out4
->sin_port
= in6
->sin6_port
;
1149 memcpy(&out4
->sin_addr
, &in6
->sin6_addr
.s6_addr
[12], 4);
1156 * Register us as a server for a particular tcp connection
1159 int ctdbd_register_ips(struct ctdbd_connection
*conn
,
1160 const struct sockaddr_storage
*_server
,
1161 const struct sockaddr_storage
*_client
,
1162 int (*cb
)(struct tevent_context
*ev
,
1163 uint32_t src_vnn
, uint32_t dst_vnn
,
1165 const uint8_t *msg
, size_t msglen
,
1166 void *private_data
),
1169 struct ctdb_connection p
;
1170 TDB_DATA data
= { .dptr
= (uint8_t *)&p
, .dsize
= sizeof(p
) };
1172 struct sockaddr_storage client
;
1173 struct sockaddr_storage server
;
1176 * Only one connection so far
1179 smbd_ctdb_canonicalize_ip(_client
, &client
);
1180 smbd_ctdb_canonicalize_ip(_server
, &server
);
1182 switch (client
.ss_family
) {
1184 memcpy(&p
.dst
.ip
, &server
, sizeof(p
.dst
.ip
));
1185 memcpy(&p
.src
.ip
, &client
, sizeof(p
.src
.ip
));
1188 memcpy(&p
.dst
.ip6
, &server
, sizeof(p
.dst
.ip6
));
1189 memcpy(&p
.src
.ip6
, &client
, sizeof(p
.src
.ip6
));
1196 * We want to be told about IP releases
1199 ret
= register_with_ctdbd(conn
, CTDB_SRVID_RELEASE_IP
,
1206 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1207 * can send an extra ack to trigger a reset for our client, so it
1208 * immediately reconnects
1210 ret
= ctdbd_control(conn
, CTDB_CURRENT_NODE
,
1211 CTDB_CONTROL_TCP_CLIENT
, 0,
1212 CTDB_CTRL_FLAG_NOREPLY
, data
, NULL
, NULL
,
1221 call a control on the local node
1223 int ctdbd_control_local(struct ctdbd_connection
*conn
, uint32_t opcode
,
1224 uint64_t srvid
, uint32_t flags
, TDB_DATA data
,
1225 TALLOC_CTX
*mem_ctx
, TDB_DATA
*outdata
,
1228 return ctdbd_control(conn
, CTDB_CURRENT_NODE
, opcode
, srvid
, flags
, data
,
1229 mem_ctx
, outdata
, cstatus
);
1232 int ctdb_watch_us(struct ctdbd_connection
*conn
)
1234 struct ctdb_notify_data_old reg_data
;
1239 reg_data
.srvid
= CTDB_SRVID_SAMBA_NOTIFY
;
1241 reg_data
.notify_data
[0] = 0;
1243 struct_len
= offsetof(struct ctdb_notify_data_old
,
1244 notify_data
) + reg_data
.len
;
1246 ret
= ctdbd_control_local(
1247 conn
, CTDB_CONTROL_REGISTER_NOTIFY
, conn
->rand_srvid
, 0,
1248 make_tdb_data((uint8_t *)®_data
, struct_len
),
1249 NULL
, NULL
, &cstatus
);
1251 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1257 int ctdb_unwatch(struct ctdbd_connection
*conn
)
1259 uint64_t srvid
= CTDB_SRVID_SAMBA_NOTIFY
;
1263 ret
= ctdbd_control_local(
1264 conn
, CTDB_CONTROL_DEREGISTER_NOTIFY
, conn
->rand_srvid
, 0,
1265 make_tdb_data((uint8_t *)&srvid
, sizeof(srvid
)),
1266 NULL
, NULL
, &cstatus
);
1268 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1274 int ctdbd_probe(const char *sockname
, int timeout
)
1277 * Do a very early check if ctdbd is around to avoid an abort and core
1280 struct ctdbd_connection
*conn
= NULL
;
1283 ret
= ctdbd_init_connection(talloc_tos(), sockname
, timeout
,
1287 * We only care if we can connect.
1294 struct ctdb_pkt_send_state
{
1295 struct ctdb_pkt_send_state
*prev
, *next
;
1296 struct tevent_context
*ev
;
1297 struct ctdbd_connection
*conn
;
1299 /* ctdb request id */
1302 /* the associated tevent request */
1303 struct tevent_req
*req
;
1305 /* iovec array with data to send */
1310 /* Initial packet length */
1314 static void ctdb_pkt_send_cleanup(struct tevent_req
*req
,
1315 enum tevent_req_state req_state
);
1318 * Asynchronously send a ctdb packet given as iovec array
1320 * Note: the passed iov array is not const here. Similar
1321 * functions in samba take a const array and create a copy
1322 * before calling iov_advance() on the array.
1324 * This function will modify the iov array! But
1325 * this is a static function and our only caller
1326 * ctdb_parse_send/recv is preparared for this to
1329 static struct tevent_req
*ctdb_pkt_send_send(TALLOC_CTX
*mem_ctx
,
1330 struct tevent_context
*ev
,
1331 struct ctdbd_connection
*conn
,
1335 enum dbwrap_req_state
*req_state
)
1337 struct tevent_req
*req
= NULL
;
1338 struct ctdb_pkt_send_state
*state
= NULL
;
1342 DBG_DEBUG("sending async ctdb reqid [%" PRIu32
"]\n", reqid
);
1344 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_pkt_send_state
);
1349 *state
= (struct ctdb_pkt_send_state
) {
1356 .packet_len
= iov_buflen(iov
, iovcnt
),
1359 tevent_req_set_cleanup_fn(req
, ctdb_pkt_send_cleanup
);
1361 *req_state
= DBWRAP_REQ_QUEUED
;
1363 if (ctdbd_conn_has_async_sends(conn
)) {
1365 * Can't attempt direct write with messages already queued and
1366 * possibly in progress
1368 DLIST_ADD_END(conn
->send_list
, state
);
1373 * Attempt a direct write. If this returns short, schedule the
1374 * remaining data as an async write, otherwise we're already done.
1377 nwritten
= writev(conn
->fd
, state
->iov
, state
->iovcnt
);
1378 if (nwritten
== state
->packet_len
) {
1379 DBG_DEBUG("Finished sending reqid [%" PRIu32
"]\n", reqid
);
1381 *req_state
= DBWRAP_REQ_DISPATCHED
;
1382 tevent_req_done(req
);
1383 return tevent_req_post(req
, ev
);
1386 if (nwritten
== -1) {
1387 if (errno
!= EINTR
&& errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
1388 cluster_fatal("cluster write error\n");
1393 DBG_DEBUG("Posting async write of reqid [%" PRIu32
"]"
1394 "after short write [%zd]\n", reqid
, nwritten
);
1396 ok
= iov_advance(&state
->iov
, &state
->iovcnt
, nwritten
);
1398 *req_state
= DBWRAP_REQ_ERROR
;
1399 tevent_req_error(req
, EIO
);
1400 return tevent_req_post(req
, ev
);
1404 * As this is the first async write req we post, we must enable
1405 * fd-writable events.
1407 TEVENT_FD_WRITEABLE(conn
->fde
);
1408 DLIST_ADD_END(conn
->send_list
, state
);
1412 static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state
*state
)
1414 struct ctdbd_connection
*conn
= state
->conn
;
1420 if (state
->req
== NULL
) {
1421 DBG_DEBUG("Removing cancelled reqid [%" PRIu32
"]\n",
1424 DLIST_REMOVE(conn
->send_list
, state
);
1428 DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32
"]\n",
1431 talloc_reparent(state
->req
, conn
, state
);
1436 static void ctdb_pkt_send_cleanup(struct tevent_req
*req
,
1437 enum tevent_req_state req_state
)
1439 struct ctdb_pkt_send_state
*state
= tevent_req_data(
1440 req
, struct ctdb_pkt_send_state
);
1441 struct ctdbd_connection
*conn
= state
->conn
;
1442 size_t missing_len
= 0;
1448 missing_len
= iov_buflen(state
->iov
, state
->iovcnt
);
1449 if (state
->packet_len
== missing_len
) {
1451 * We haven't yet started sending this one, so we can just
1452 * remove it from the pending list
1456 if (missing_len
!= 0) {
1457 uint8_t *buf
= NULL
;
1459 if (req_state
!= TEVENT_REQ_RECEIVED
) {
1461 * Wait til the req_state is TEVENT_REQ_RECEIVED, as
1462 * that will be the final state when the request state
1463 * is talloc_free'd from tallloc_req_received(). Which
1464 * ensures we only run the following code *ONCE*!
1469 DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32
"]\n",
1472 * A request in progress of being sent. Reparent the iov buffer
1473 * so we can continue sending the request. See also the comment
1474 * in ctdbd_parse_send() when copying the key buffer.
1477 buf
= iov_concat(state
, state
->iov
, state
->iovcnt
);
1479 cluster_fatal("iov_concat error\n");
1484 state
->_iov
.iov_base
= buf
;
1485 state
->_iov
.iov_len
= missing_len
;
1486 state
->iov
= &state
->_iov
;
1488 talloc_set_destructor(state
, ctdb_pkt_send_state_destructor
);
1492 DBG_DEBUG("Removing pending reqid [%" PRIu32
"]\n", state
->reqid
);
1495 DLIST_REMOVE(conn
->send_list
, state
);
1497 if (!ctdbd_conn_has_async_sends(conn
)) {
1498 DBG_DEBUG("No more sends, disabling fd-writable events\n");
1499 TEVENT_FD_NOT_WRITEABLE(conn
->fde
);
1503 static int ctdb_pkt_send_handler(struct ctdbd_connection
*conn
)
1505 struct ctdb_pkt_send_state
*state
= NULL
;
1510 DBG_DEBUG("send handler\n");
1512 if (!ctdbd_conn_has_async_sends(conn
)) {
1513 DBG_WARNING("Writable fd-event without pending send\n");
1514 TEVENT_FD_NOT_WRITEABLE(conn
->fde
);
1518 state
= conn
->send_list
;
1519 iovlen
= iov_buflen(state
->iov
, state
->iovcnt
);
1521 nwritten
= writev(conn
->fd
, state
->iov
, state
->iovcnt
);
1522 if (nwritten
== -1) {
1523 if (errno
!= EINTR
&& errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
1524 DBG_ERR("writev failed: %s\n", strerror(errno
));
1525 cluster_fatal("cluster write error\n");
1527 DBG_DEBUG("recoverable writev error, retry\n");
1531 if (nwritten
< iovlen
) {
1532 DBG_DEBUG("short write\n");
1534 ok
= iov_advance(&state
->iov
, &state
->iovcnt
, nwritten
);
1536 DBG_ERR("iov_advance failed\n");
1537 if (state
->req
== NULL
) {
1541 tevent_req_error(state
->req
, EIO
);
1547 if (state
->req
== NULL
) {
1548 DBG_DEBUG("Finished sending cancelled reqid [%" PRIu32
"]\n",
1554 DBG_DEBUG("Finished send request id [%" PRIu32
"]\n", state
->reqid
);
1556 tevent_req_done(state
->req
);
1560 static int ctdb_pkt_send_recv(struct tevent_req
*req
)
1564 if (tevent_req_is_unix_error(req
, &ret
)) {
1565 tevent_req_received(req
);
1569 tevent_req_received(req
);
1573 struct ctdb_pkt_recv_state
{
1574 struct ctdb_pkt_recv_state
*prev
, *next
;
1575 struct tevent_context
*ev
;
1576 struct ctdbd_connection
*conn
;
1578 /* ctdb request id */
1581 /* the associated tevent_req */
1582 struct tevent_req
*req
;
1584 /* pointer to allocated ctdb packet buffer */
1585 struct ctdb_req_header
*hdr
;
1588 static void ctdb_pkt_recv_cleanup(struct tevent_req
*req
,
1589 enum tevent_req_state req_state
);
1591 static struct tevent_req
*ctdb_pkt_recv_send(TALLOC_CTX
*mem_ctx
,
1592 struct tevent_context
*ev
,
1593 struct ctdbd_connection
*conn
,
1596 struct tevent_req
*req
= NULL
;
1597 struct ctdb_pkt_recv_state
*state
= NULL
;
1599 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_pkt_recv_state
);
1604 *state
= (struct ctdb_pkt_recv_state
) {
1611 tevent_req_set_cleanup_fn(req
, ctdb_pkt_recv_cleanup
);
1614 * fd-readable event is always set for the fde, no need to deal with
1618 DLIST_ADD_END(conn
->recv_list
, state
);
1619 DBG_DEBUG("Posted receive reqid [%" PRIu32
"]\n", state
->reqid
);
1624 static void ctdb_pkt_recv_cleanup(struct tevent_req
*req
,
1625 enum tevent_req_state req_state
)
1627 struct ctdb_pkt_recv_state
*state
= tevent_req_data(
1628 req
, struct ctdb_pkt_recv_state
);
1629 struct ctdbd_connection
*conn
= state
->conn
;
1635 DLIST_REMOVE(conn
->recv_list
, state
);
1638 static int ctdb_pkt_recv_handler(struct ctdbd_connection
*conn
)
1640 struct ctdb_pkt_recv_state
*state
= NULL
;
1645 DBG_DEBUG("receive handler\n");
1647 if (conn
->read_state
.iovs
== NULL
) {
1648 conn
->read_state
.iov
.iov_base
= &conn
->read_state
.msglen
;
1649 conn
->read_state
.iov
.iov_len
= sizeof(conn
->read_state
.msglen
);
1650 conn
->read_state
.iovs
= &conn
->read_state
.iov
;
1651 conn
->read_state
.iovcnt
= 1;
1654 iovlen
= iov_buflen(conn
->read_state
.iovs
, conn
->read_state
.iovcnt
);
1656 DBG_DEBUG("iovlen [%zd]\n", iovlen
);
1658 nread
= readv(conn
->fd
, conn
->read_state
.iovs
, conn
->read_state
.iovcnt
);
1660 cluster_fatal("cluster read error, peer closed connection\n");
1663 if (errno
!= EINTR
&& errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
1664 cluster_fatal("cluster read error\n");
1666 DBG_DEBUG("recoverable error from readv, retry\n");
1670 if (nread
< iovlen
) {
1671 DBG_DEBUG("iovlen [%zd] nread [%zd]\n", iovlen
, nread
);
1672 ok
= iov_advance(&conn
->read_state
.iovs
,
1673 &conn
->read_state
.iovcnt
,
1681 conn
->read_state
.iovs
= NULL
;
1682 conn
->read_state
.iovcnt
= 0;
1684 if (conn
->read_state
.hdr
== NULL
) {
1686 * Going this way after reading the 4 initial byte message
1689 uint32_t msglen
= conn
->read_state
.msglen
;
1690 uint8_t *readbuf
= NULL
;
1693 DBG_DEBUG("msglen: %" PRIu32
"\n", msglen
);
1695 if (msglen
< sizeof(struct ctdb_req_header
)) {
1696 DBG_ERR("short message %" PRIu32
"\n", msglen
);
1700 conn
->read_state
.hdr
= talloc_size(conn
, msglen
);
1701 if (conn
->read_state
.hdr
== NULL
) {
1704 conn
->read_state
.hdr
->length
= msglen
;
1705 talloc_set_name_const(conn
->read_state
.hdr
,
1706 "struct ctdb_req_header");
1708 readbuf
= (uint8_t *)conn
->read_state
.hdr
+ sizeof(msglen
);
1709 readlen
= msglen
- sizeof(msglen
);
1711 conn
->read_state
.iov
.iov_base
= readbuf
;
1712 conn
->read_state
.iov
.iov_len
= readlen
;
1713 conn
->read_state
.iovs
= &conn
->read_state
.iov
;
1714 conn
->read_state
.iovcnt
= 1;
1716 DBG_DEBUG("Scheduled packet read size %zd\n", readlen
);
1721 * Searching a list here is expected to be cheap, as messages are
1722 * exepcted to be coming in more or less ordered and we should find the
1723 * waiting request near the beginning of the list.
1725 for (state
= conn
->recv_list
; state
!= NULL
; state
= state
->next
) {
1726 if (state
->reqid
== conn
->read_state
.hdr
->reqid
) {
1731 if (state
== NULL
) {
1732 DBG_ERR("Discarding async ctdb reqid %u\n",
1733 conn
->read_state
.hdr
->reqid
);
1734 TALLOC_FREE(conn
->read_state
.hdr
);
1735 ZERO_STRUCT(conn
->read_state
);
1739 DBG_DEBUG("Got reply for reqid [%" PRIu32
"]\n", state
->reqid
);
1741 state
->hdr
= talloc_move(state
, &conn
->read_state
.hdr
);
1742 ZERO_STRUCT(conn
->read_state
);
1743 tevent_req_done(state
->req
);
1747 static int ctdb_pkt_recv_recv(struct tevent_req
*req
,
1748 TALLOC_CTX
*mem_ctx
,
1749 struct ctdb_req_header
**_hdr
)
1751 struct ctdb_pkt_recv_state
*state
= tevent_req_data(
1752 req
, struct ctdb_pkt_recv_state
);
1755 if (tevent_req_is_unix_error(req
, &error
)) {
1756 DBG_ERR("ctdb_read_req failed %s\n", strerror(error
));
1757 tevent_req_received(req
);
1761 *_hdr
= talloc_move(mem_ctx
, &state
->hdr
);
1763 tevent_req_received(req
);
1767 static int ctdbd_connection_destructor(struct ctdbd_connection
*c
)
1769 TALLOC_FREE(c
->fde
);
1775 TALLOC_FREE(c
->read_state
.hdr
);
1776 ZERO_STRUCT(c
->read_state
);
1778 while (c
->send_list
!= NULL
) {
1779 struct ctdb_pkt_send_state
*send_state
= c
->send_list
;
1780 DLIST_REMOVE(c
->send_list
, send_state
);
1781 send_state
->conn
= NULL
;
1782 tevent_req_defer_callback(send_state
->req
, send_state
->ev
);
1783 tevent_req_error(send_state
->req
, EIO
);
1786 while (c
->recv_list
!= NULL
) {
1787 struct ctdb_pkt_recv_state
*recv_state
= c
->recv_list
;
1788 DLIST_REMOVE(c
->recv_list
, recv_state
);
1789 recv_state
->conn
= NULL
;
1790 tevent_req_defer_callback(recv_state
->req
, recv_state
->ev
);
1791 tevent_req_error(recv_state
->req
, EIO
);
1797 struct ctdbd_parse_state
{
1798 struct tevent_context
*ev
;
1799 struct ctdbd_connection
*conn
;
1802 uint8_t _keybuf
[64];
1803 struct ctdb_req_call_old ctdb_req
;
1804 struct iovec iov
[2];
1805 void (*parser
)(TDB_DATA key
,
1807 void *private_data
);
1809 enum dbwrap_req_state
*req_state
;
1812 static void ctdbd_parse_pkt_send_done(struct tevent_req
*subreq
);
1813 static void ctdbd_parse_done(struct tevent_req
*subreq
);
1815 struct tevent_req
*ctdbd_parse_send(TALLOC_CTX
*mem_ctx
,
1816 struct tevent_context
*ev
,
1817 struct ctdbd_connection
*conn
,
1821 void (*parser
)(TDB_DATA key
,
1823 void *private_data
),
1825 enum dbwrap_req_state
*req_state
)
1827 struct tevent_req
*req
= NULL
;
1828 struct ctdbd_parse_state
*state
= NULL
;
1830 uint32_t packet_length
;
1831 struct tevent_req
*subreq
= NULL
;
1833 req
= tevent_req_create(mem_ctx
, &state
, struct ctdbd_parse_state
);
1835 *req_state
= DBWRAP_REQ_ERROR
;
1839 *state
= (struct ctdbd_parse_state
) {
1842 .reqid
= ctdbd_next_reqid(conn
),
1844 .private_data
= private_data
,
1845 .req_state
= req_state
,
1848 flags
= local_copy
? CTDB_WANT_READONLY
: 0;
1849 packet_length
= offsetof(struct ctdb_req_call_old
, data
) + key
.dsize
;
1852 * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
1853 * all passed iov elements have a lifetime longer that the tevent_req
1854 * returned by ctdb_pkt_send_send(). This is required continue sending a
1855 * the low level request into the ctdb socket, if a higher level
1856 * ('this') request is canceled (or talloc free'd) by the application
1857 * layer, without sending invalid packets to ctdb.
1859 if (key
.dsize
> sizeof(state
->_keybuf
)) {
1860 state
->key
.dptr
= talloc_memdup(state
, key
.dptr
, key
.dsize
);
1861 if (tevent_req_nomem(state
->key
.dptr
, req
)) {
1862 return tevent_req_post(req
, ev
);
1865 memcpy(state
->_keybuf
, key
.dptr
, key
.dsize
);
1866 state
->key
.dptr
= state
->_keybuf
;
1868 state
->key
.dsize
= key
.dsize
;
1870 state
->ctdb_req
.hdr
.length
= packet_length
;
1871 state
->ctdb_req
.hdr
.ctdb_magic
= CTDB_MAGIC
;
1872 state
->ctdb_req
.hdr
.ctdb_version
= CTDB_PROTOCOL
;
1873 state
->ctdb_req
.hdr
.operation
= CTDB_REQ_CALL
;
1874 state
->ctdb_req
.hdr
.reqid
= state
->reqid
;
1875 state
->ctdb_req
.flags
= flags
;
1876 state
->ctdb_req
.callid
= CTDB_FETCH_FUNC
;
1877 state
->ctdb_req
.db_id
= db_id
;
1878 state
->ctdb_req
.keylen
= state
->key
.dsize
;
1880 state
->iov
[0].iov_base
= &state
->ctdb_req
;
1881 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_call_old
, data
);
1882 state
->iov
[1].iov_base
= state
->key
.dptr
;
1883 state
->iov
[1].iov_len
= state
->key
.dsize
;
1886 * Note that ctdb_pkt_send_send()
1887 * will modify state->iov using
1888 * iov_advance() without making a copy.
1890 subreq
= ctdb_pkt_send_send(state
,
1895 ARRAY_SIZE(state
->iov
),
1897 if (tevent_req_nomem(subreq
, req
)) {
1898 *req_state
= DBWRAP_REQ_ERROR
;
1899 return tevent_req_post(req
, ev
);
1901 tevent_req_set_callback(subreq
, ctdbd_parse_pkt_send_done
, req
);
1906 static void ctdbd_parse_pkt_send_done(struct tevent_req
*subreq
)
1908 struct tevent_req
*req
= tevent_req_callback_data(
1909 subreq
, struct tevent_req
);
1910 struct ctdbd_parse_state
*state
= tevent_req_data(
1911 req
, struct ctdbd_parse_state
);
1914 ret
= ctdb_pkt_send_recv(subreq
);
1915 TALLOC_FREE(subreq
);
1916 if (tevent_req_error(req
, ret
)) {
1917 DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret
));
1921 subreq
= ctdb_pkt_recv_send(state
,
1925 if (tevent_req_nomem(subreq
, req
)) {
1929 *state
->req_state
= DBWRAP_REQ_DISPATCHED
;
1930 tevent_req_set_callback(subreq
, ctdbd_parse_done
, req
);
1934 static void ctdbd_parse_done(struct tevent_req
*subreq
)
1936 struct tevent_req
*req
= tevent_req_callback_data(
1937 subreq
, struct tevent_req
);
1938 struct ctdbd_parse_state
*state
= tevent_req_data(
1939 req
, struct ctdbd_parse_state
);
1940 struct ctdb_req_header
*hdr
= NULL
;
1941 struct ctdb_reply_call_old
*reply
= NULL
;
1944 DBG_DEBUG("async parse request finished\n");
1946 ret
= ctdb_pkt_recv_recv(subreq
, state
, &hdr
);
1947 TALLOC_FREE(subreq
);
1948 if (tevent_req_error(req
, ret
)) {
1949 DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret
));
1953 if (hdr
->operation
!= CTDB_REPLY_CALL
) {
1954 DBG_ERR("received invalid reply\n");
1955 ctdb_packet_dump(hdr
);
1956 tevent_req_error(req
, EIO
);
1960 reply
= (struct ctdb_reply_call_old
*)hdr
;
1962 if (reply
->datalen
== 0) {
1964 * Treat an empty record as non-existing
1966 tevent_req_error(req
, ENOENT
);
1970 state
->parser(state
->key
,
1971 make_tdb_data(&reply
->data
[0], reply
->datalen
),
1972 state
->private_data
);
1974 tevent_req_done(req
);
1978 int ctdbd_parse_recv(struct tevent_req
*req
)
1982 if (tevent_req_is_unix_error(req
, &error
)) {
1983 DBG_DEBUG("async parse returned %s\n", strerror(error
));
1984 tevent_req_received(req
);
1988 tevent_req_received(req
);