2 Unix SMB/CIFS implementation.
3 Samba3 ctdb connection handling
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
26 #include <ctdb_protocol.h>
28 #include "lib/async_req/async_sock.h"
32 struct tevent_queue
*outqueue
;
35 struct ctdb_conn_init_state
{
36 struct sockaddr_un addr
;
37 struct ctdb_conn
*conn
;
41 * use the callbacks of async_connect_send to make sure
42 * we are connecting to CTDB as root
44 static void before_connect_cb(void *private_data
) {
48 static void after_connect_cb(void *private_data
) {
52 static void ctdb_conn_init_done(struct tevent_req
*subreq
);
53 static int ctdb_conn_destructor(struct ctdb_conn
*conn
);
55 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
56 struct tevent_context
*ev
,
59 struct tevent_req
*req
, *subreq
;
60 struct ctdb_conn_init_state
*state
;
63 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_conn_init_state
);
68 if (!lp_clustering()) {
69 tevent_req_error(req
, ENOSYS
);
70 return tevent_req_post(req
, ev
);
73 state
->conn
= talloc(state
, struct ctdb_conn
);
74 if (tevent_req_nomem(state
->conn
, req
)) {
75 return tevent_req_post(req
, ev
);
78 state
->conn
->outqueue
= tevent_queue_create(
79 state
->conn
, "ctdb outqueue");
80 if (tevent_req_nomem(state
->conn
->outqueue
, req
)) {
81 return tevent_req_post(req
, ev
);
84 state
->conn
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
85 if (state
->conn
->fd
== -1) {
86 tevent_req_error(req
, errno
);
87 return tevent_req_post(req
, ev
);
89 talloc_set_destructor(state
->conn
, ctdb_conn_destructor
);
91 state
->addr
.sun_family
= AF_UNIX
;
93 len
= strlcpy(state
->addr
.sun_path
, sock
,
94 sizeof(state
->addr
.sun_path
));
95 if (len
>= sizeof(state
->addr
.sun_path
)) {
96 tevent_req_error(req
, ENAMETOOLONG
);
97 return tevent_req_post(req
, ev
);
100 subreq
= async_connect_send(state
, ev
, state
->conn
->fd
,
101 (struct sockaddr
*)&state
->addr
,
102 sizeof(state
->addr
), before_connect_cb
,
103 after_connect_cb
, NULL
);
104 if (tevent_req_nomem(subreq
, req
)) {
105 return tevent_req_post(req
, ev
);
107 tevent_req_set_callback(subreq
, ctdb_conn_init_done
, req
);
111 static int ctdb_conn_destructor(struct ctdb_conn
*c
)
120 static void ctdb_conn_init_done(struct tevent_req
*subreq
)
122 struct tevent_req
*req
= tevent_req_callback_data(
123 subreq
, struct tevent_req
);
126 ret
= async_connect_recv(subreq
, &err
);
129 tevent_req_error(req
, err
);
132 tevent_req_done(req
);
135 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
136 struct ctdb_conn
**pconn
)
138 struct ctdb_conn_init_state
*state
= tevent_req_data(
139 req
, struct ctdb_conn_init_state
);
142 if (tevent_req_is_unix_error(req
, &err
)) {
145 *pconn
= talloc_move(mem_ctx
, &state
->conn
);
150 struct ctdb_conn_control_state
{
151 struct tevent_context
*ev
;
152 struct ctdb_conn
*conn
;
153 struct ctdb_req_control req
;
155 struct ctdb_reply_control
*reply
;
158 static void ctdb_conn_control_written(struct tevent_req
*subreq
);
159 static void ctdb_conn_control_done(struct tevent_req
*subreq
);
160 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
);
162 struct tevent_req
*ctdb_conn_control_send(TALLOC_CTX
*mem_ctx
,
163 struct tevent_context
*ev
,
164 struct ctdb_conn
*conn
,
165 uint32_t vnn
, uint32_t opcode
,
166 uint64_t srvid
, uint32_t flags
,
167 uint8_t *data
, size_t datalen
)
169 struct tevent_req
*req
, *subreq
;
170 struct ctdb_conn_control_state
*state
;
171 struct ctdb_req_header
*hdr
;
173 req
= tevent_req_create(mem_ctx
, &state
,
174 struct ctdb_conn_control_state
);
181 hdr
= &state
->req
.hdr
;
182 hdr
->length
= offsetof(struct ctdb_req_control
, data
) + datalen
;
183 hdr
->ctdb_magic
= CTDB_MAGIC
;
184 hdr
->ctdb_version
= CTDB_PROTOCOL
;
185 hdr
->operation
= CTDB_REQ_CONTROL
;
186 hdr
->reqid
= 1; /* FIXME */
188 state
->req
.opcode
= opcode
;
189 state
->req
.srvid
= srvid
;
190 state
->req
.datalen
= datalen
;
191 state
->req
.flags
= flags
;
193 state
->iov
[0].iov_base
= &state
->req
;
194 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_control
, data
);
195 state
->iov
[1].iov_base
= data
;
196 state
->iov
[1].iov_len
= datalen
;
198 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
200 if (tevent_req_nomem(subreq
, req
)) {
201 return tevent_req_post(req
, ev
);
203 tevent_req_set_callback(subreq
, ctdb_conn_control_written
, req
);
207 static void ctdb_conn_control_written(struct tevent_req
*subreq
)
209 struct tevent_req
*req
= tevent_req_callback_data(
210 subreq
, struct tevent_req
);
211 struct ctdb_conn_control_state
*state
= tevent_req_data(
212 req
, struct ctdb_conn_control_state
);
216 written
= writev_recv(subreq
, &err
);
219 tevent_req_error(req
, err
);
222 subreq
= read_packet_send(
223 state
, state
->ev
, state
->conn
->fd
, sizeof(uint32_t),
224 ctdb_packet_more
, NULL
);
225 if (tevent_req_nomem(subreq
, req
)) {
228 tevent_req_set_callback(subreq
, ctdb_conn_control_done
, req
);
231 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
)
235 if (buflen
> sizeof(uint32_t)) {
236 /* Been here, done */
239 memcpy(&len
, buf
, sizeof(len
));
241 if (len
< sizeof(uint32_t)) {
245 return (len
- sizeof(uint32_t));
248 static void ctdb_conn_control_done(struct tevent_req
*subreq
)
250 struct tevent_req
*req
= tevent_req_callback_data(
251 subreq
, struct tevent_req
);
252 struct ctdb_conn_control_state
*state
= tevent_req_data(
253 req
, struct ctdb_conn_control_state
);
258 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
261 tevent_req_error(req
, err
);
264 state
->reply
= (struct ctdb_reply_control
*)buf
;
265 tevent_req_done(req
);
268 int ctdb_conn_control_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
269 struct ctdb_reply_control
**preply
)
271 struct ctdb_conn_control_state
*state
= tevent_req_data(
272 req
, struct ctdb_conn_control_state
);
275 if (tevent_req_is_unix_error(req
, &err
)) {
278 if (preply
!= NULL
) {
279 *preply
= talloc_move(mem_ctx
, &state
->reply
);
284 struct ctdb_conn_msg_write_state
{
285 struct ctdb_req_message ctdb_msg
;
289 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
);
291 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
292 struct tevent_context
*ev
,
293 struct ctdb_conn
*conn
,
294 uint32_t vnn
, uint64_t srvid
,
295 uint8_t *msg
, size_t msg_len
)
297 struct tevent_req
*req
, *subreq
;
298 struct ctdb_conn_msg_write_state
*state
;
299 struct ctdb_req_header
*h
;
301 req
= tevent_req_create(mem_ctx
, &state
,
302 struct ctdb_conn_msg_write_state
);
307 h
= &state
->ctdb_msg
.hdr
;
309 h
->length
= offsetof(struct ctdb_req_message
, data
) + msg_len
;
310 h
->ctdb_magic
= CTDB_MAGIC
;
311 h
->ctdb_version
= CTDB_PROTOCOL
;
313 h
->operation
= CTDB_REQ_MESSAGE
;
315 h
->srcnode
= CTDB_CURRENT_NODE
;
317 state
->ctdb_msg
.srvid
= srvid
;
318 state
->ctdb_msg
.datalen
= msg_len
;
320 state
->iov
[0].iov_base
= &state
->ctdb_msg
;
321 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_message
, data
);
322 state
->iov
[1].iov_base
= msg
;
323 state
->iov
[1].iov_len
= msg_len
;
325 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
327 if (tevent_req_nomem(subreq
, req
)) {
328 return tevent_req_post(req
, ev
);
330 tevent_req_set_callback(subreq
, ctdb_conn_msg_write_done
, req
);
334 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
)
336 struct tevent_req
*req
= tevent_req_callback_data(
337 subreq
, struct tevent_req
);
341 written
= writev_recv(subreq
, &err
);
344 tevent_req_error(req
, err
);
347 tevent_req_done(req
);
350 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
352 return tevent_req_simple_recv_unix(req
);
355 struct ctdb_msg_channel
{
356 struct ctdb_conn
*conn
;
359 struct ctdb_msg_channel_init_state
{
360 struct tevent_context
*ev
;
361 struct ctdb_conn
*conn
;
363 struct ctdb_msg_channel
*channel
;
366 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
);
367 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
);
369 struct tevent_req
*ctdb_msg_channel_init_send(
370 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
371 const char *sock
, uint64_t srvid
)
373 struct tevent_req
*req
, *subreq
;
374 struct ctdb_msg_channel_init_state
*state
;
376 req
= tevent_req_create(mem_ctx
, &state
,
377 struct ctdb_msg_channel_init_state
);
382 state
->srvid
= srvid
;
384 subreq
= ctdb_conn_init_send(state
, ev
, sock
);
385 if (tevent_req_nomem(subreq
, req
)) {
386 return tevent_req_post(req
, ev
);
388 tevent_req_set_callback(subreq
, ctdb_msg_channel_init_connected
, req
);
392 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
)
394 struct tevent_req
*req
= tevent_req_callback_data(
395 subreq
, struct tevent_req
);
396 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
397 req
, struct ctdb_msg_channel_init_state
);
400 ret
= ctdb_conn_init_recv(subreq
, state
, &state
->conn
);
402 if (tevent_req_error(req
, ret
)) {
405 subreq
= ctdb_conn_control_send(state
, state
->ev
, state
->conn
,
407 CTDB_CONTROL_REGISTER_SRVID
,
408 state
->srvid
, 0, NULL
, 0);
409 if (tevent_req_nomem(subreq
, req
)) {
412 tevent_req_set_callback(
413 subreq
, ctdb_msg_channel_init_registered_srvid
, req
);
416 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
)
418 struct tevent_req
*req
= tevent_req_callback_data(
419 subreq
, struct tevent_req
);
420 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
421 req
, struct ctdb_msg_channel_init_state
);
422 struct ctdb_reply_control
*reply
;
425 ret
= ctdb_conn_control_recv(subreq
, talloc_tos(), &reply
);
427 if (tevent_req_error(req
, ret
)) {
430 if (reply
->status
!= 0) {
431 tevent_req_error(req
, EIO
);
434 state
->channel
= talloc(state
, struct ctdb_msg_channel
);
435 if (tevent_req_nomem(state
->channel
, req
)) {
438 state
->channel
->conn
= talloc_move(state
->channel
, &state
->conn
);
439 tevent_req_done(req
);
442 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
443 struct ctdb_msg_channel
**pchannel
)
445 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
446 req
, struct ctdb_msg_channel_init_state
);
449 if (tevent_req_is_unix_error(req
, &err
)) {
452 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
456 struct ctdb_msg_read_state
{
461 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
);
463 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
464 struct tevent_context
*ev
,
465 struct ctdb_msg_channel
*channel
)
467 struct tevent_req
*req
, *subreq
;
468 struct ctdb_msg_read_state
*state
;
470 req
= tevent_req_create(mem_ctx
, &state
,
471 struct ctdb_msg_read_state
);
475 subreq
= read_packet_send(state
, ev
, channel
->conn
->fd
,
476 sizeof(uint32_t), ctdb_packet_more
, NULL
);
477 if (tevent_req_nomem(subreq
, req
)) {
478 return tevent_req_post(req
, ev
);
480 tevent_req_set_callback(subreq
, ctdb_msg_channel_got_msg
, req
);
484 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
)
486 struct tevent_req
*req
= tevent_req_callback_data(
487 subreq
, struct tevent_req
);
488 struct ctdb_msg_read_state
*state
= tevent_req_data(
489 req
, struct ctdb_msg_read_state
);
494 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
496 tevent_req_error(req
, err
);
499 state
->buflen
= nread
;
501 tevent_req_done(req
);
504 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
505 uint8_t **pmsg
, size_t *pmsg_len
)
507 struct ctdb_msg_read_state
*state
= tevent_req_data(
508 req
, struct ctdb_msg_read_state
);
509 struct ctdb_req_header
*hdr
;
510 struct ctdb_req_message
*msg
;
514 if (tevent_req_is_unix_error(req
, &err
)) {
518 hdr
= (struct ctdb_req_header
*)state
->buf
;
519 if (hdr
->length
!= state
->buflen
) {
520 DEBUG(10, ("Got invalid header length\n"));
523 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
524 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
525 CTDB_REQ_MESSAGE
, (int)hdr
->operation
));
528 if (hdr
->length
< offsetof(struct ctdb_req_message
, data
)) {
529 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr
->length
));
533 msg
= (struct ctdb_req_message
*)hdr
;
535 hdr
->length
- offsetof(struct ctdb_req_message
, data
)) {
536 DEBUG(10, ("Got invalid datalen %d\n", (int)msg
->datalen
));
540 buf
= (uint8_t *)talloc_memdup(mem_ctx
, msg
->data
, msg
->datalen
);
545 *pmsg_len
= msg
->datalen
;