2 Unix SMB/CIFS implementation.
3 Samba3 ctdb connection handling
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
24 #ifdef CLUSTER_SUPPORT
26 #include "lib/async_req/async_sock.h"
30 struct tevent_queue
*outqueue
;
33 struct ctdb_conn_init_state
{
34 struct sockaddr_un addr
;
35 struct ctdb_conn
*conn
;
39 * use the callbacks of async_connect_send to make sure
40 * we are connecting to CTDB as root
42 static void before_connect_cb(void *private_data
) {
46 static void after_connect_cb(void *private_data
) {
50 static void ctdb_conn_init_done(struct tevent_req
*subreq
);
51 static int ctdb_conn_destructor(struct ctdb_conn
*conn
);
53 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
54 struct tevent_context
*ev
,
57 struct tevent_req
*req
, *subreq
;
58 struct ctdb_conn_init_state
*state
;
60 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_conn_init_state
);
65 if (!lp_clustering()) {
66 tevent_req_error(req
, ENOSYS
);
67 return tevent_req_post(req
, ev
);
70 if (strlen(sock
) >= sizeof(state
->addr
.sun_path
)) {
71 tevent_req_error(req
, ENAMETOOLONG
);
72 return tevent_req_post(req
, ev
);
75 state
->conn
= talloc(state
, struct ctdb_conn
);
76 if (tevent_req_nomem(state
->conn
, req
)) {
77 return tevent_req_post(req
, ev
);
80 state
->conn
->outqueue
= tevent_queue_create(
81 state
->conn
, "ctdb outqueue");
82 if (tevent_req_nomem(state
->conn
->outqueue
, req
)) {
83 return tevent_req_post(req
, ev
);
86 state
->conn
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
87 if (state
->conn
->fd
== -1) {
88 tevent_req_error(req
, errno
);
89 return tevent_req_post(req
, ev
);
91 talloc_set_destructor(state
->conn
, ctdb_conn_destructor
);
93 state
->addr
.sun_family
= AF_UNIX
;
94 strncpy(state
->addr
.sun_path
, sock
, sizeof(state
->addr
.sun_path
));
96 subreq
= async_connect_send(state
, ev
, state
->conn
->fd
,
97 (struct sockaddr
*)&state
->addr
,
98 sizeof(state
->addr
), before_connect_cb
,
99 after_connect_cb
, NULL
);
100 if (tevent_req_nomem(subreq
, req
)) {
101 return tevent_req_post(req
, ev
);
103 tevent_req_set_callback(subreq
, ctdb_conn_init_done
, req
);
107 static int ctdb_conn_destructor(struct ctdb_conn
*c
)
116 static void ctdb_conn_init_done(struct tevent_req
*subreq
)
118 struct tevent_req
*req
= tevent_req_callback_data(
119 subreq
, struct tevent_req
);
122 ret
= async_connect_recv(subreq
, &err
);
125 tevent_req_error(req
, err
);
128 tevent_req_done(req
);
131 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
132 struct ctdb_conn
**pconn
)
134 struct ctdb_conn_init_state
*state
= tevent_req_data(
135 req
, struct ctdb_conn_init_state
);
138 if (tevent_req_is_unix_error(req
, &err
)) {
141 *pconn
= talloc_move(mem_ctx
, &state
->conn
);
146 struct ctdb_conn_control_state
{
147 struct tevent_context
*ev
;
148 struct ctdb_conn
*conn
;
149 struct ctdb_req_control req
;
151 struct ctdb_reply_control
*reply
;
154 static void ctdb_conn_control_written(struct tevent_req
*subreq
);
155 static void ctdb_conn_control_done(struct tevent_req
*subreq
);
156 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
);
158 struct tevent_req
*ctdb_conn_control_send(TALLOC_CTX
*mem_ctx
,
159 struct tevent_context
*ev
,
160 struct ctdb_conn
*conn
,
161 uint32_t vnn
, uint32_t opcode
,
162 uint64_t srvid
, uint32_t flags
,
163 uint8_t *data
, size_t datalen
)
165 struct tevent_req
*req
, *subreq
;
166 struct ctdb_conn_control_state
*state
;
167 struct ctdb_req_header
*hdr
;
169 req
= tevent_req_create(mem_ctx
, &state
,
170 struct ctdb_conn_control_state
);
177 hdr
= &state
->req
.hdr
;
178 hdr
->length
= offsetof(struct ctdb_req_control
, data
) + datalen
;
179 hdr
->ctdb_magic
= CTDB_MAGIC
;
180 hdr
->ctdb_version
= CTDB_VERSION
;
181 hdr
->operation
= CTDB_REQ_CONTROL
;
182 hdr
->reqid
= 1; /* FIXME */
184 state
->req
.opcode
= opcode
;
185 state
->req
.srvid
= srvid
;
186 state
->req
.datalen
= datalen
;
187 state
->req
.flags
= flags
;
189 state
->iov
[0].iov_base
= &state
->req
;
190 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_control
, data
);
191 state
->iov
[1].iov_base
= data
;
192 state
->iov
[1].iov_len
= datalen
;
194 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
196 if (tevent_req_nomem(subreq
, req
)) {
197 return tevent_req_post(req
, ev
);
199 tevent_req_set_callback(subreq
, ctdb_conn_control_written
, req
);
203 static void ctdb_conn_control_written(struct tevent_req
*subreq
)
205 struct tevent_req
*req
= tevent_req_callback_data(
206 subreq
, struct tevent_req
);
207 struct ctdb_conn_control_state
*state
= tevent_req_data(
208 req
, struct ctdb_conn_control_state
);
212 written
= writev_recv(subreq
, &err
);
215 tevent_req_error(req
, err
);
218 subreq
= read_packet_send(
219 state
, state
->ev
, state
->conn
->fd
, sizeof(uint32_t),
220 ctdb_packet_more
, NULL
);
221 if (tevent_req_nomem(subreq
, req
)) {
224 tevent_req_set_callback(subreq
, ctdb_conn_control_done
, req
);
227 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
)
231 if (buflen
> sizeof(uint32_t)) {
232 /* Been here, done */
235 memcpy(&len
, buf
, sizeof(len
));
236 return (len
- sizeof(uint32_t));
239 static void ctdb_conn_control_done(struct tevent_req
*subreq
)
241 struct tevent_req
*req
= tevent_req_callback_data(
242 subreq
, struct tevent_req
);
243 struct ctdb_conn_control_state
*state
= tevent_req_data(
244 req
, struct ctdb_conn_control_state
);
249 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
252 tevent_req_error(req
, err
);
255 state
->reply
= (struct ctdb_reply_control
*)buf
;
256 tevent_req_done(req
);
259 int ctdb_conn_control_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
260 struct ctdb_reply_control
**preply
)
262 struct ctdb_conn_control_state
*state
= tevent_req_data(
263 req
, struct ctdb_conn_control_state
);
266 if (tevent_req_is_unix_error(req
, &err
)) {
269 if (preply
!= NULL
) {
270 *preply
= talloc_move(mem_ctx
, &state
->reply
);
275 struct ctdb_conn_msg_write_state
{
276 struct ctdb_req_message ctdb_msg
;
280 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
);
282 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
283 struct tevent_context
*ev
,
284 struct ctdb_conn
*conn
,
285 uint32_t vnn
, uint64_t srvid
,
286 uint8_t *msg
, size_t msg_len
)
288 struct tevent_req
*req
, *subreq
;
289 struct ctdb_conn_msg_write_state
*state
;
290 struct ctdb_req_header
*h
;
292 req
= tevent_req_create(mem_ctx
, &state
,
293 struct ctdb_conn_msg_write_state
);
298 h
= &state
->ctdb_msg
.hdr
;
300 h
->length
= offsetof(struct ctdb_req_message
, data
) + msg_len
;
301 h
->ctdb_magic
= CTDB_MAGIC
;
302 h
->ctdb_version
= CTDB_VERSION
;
304 h
->operation
= CTDB_REQ_MESSAGE
;
306 h
->srcnode
= CTDB_CURRENT_NODE
;
308 state
->ctdb_msg
.srvid
= srvid
;
309 state
->ctdb_msg
.datalen
= msg_len
;
311 state
->iov
[0].iov_base
= &state
->ctdb_msg
;
312 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_message
, data
);
313 state
->iov
[1].iov_base
= msg
;
314 state
->iov
[1].iov_len
= msg_len
;
316 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
318 if (tevent_req_nomem(subreq
, req
)) {
319 return tevent_req_post(req
, ev
);
321 tevent_req_set_callback(subreq
, ctdb_conn_msg_write_done
, req
);
325 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
)
327 struct tevent_req
*req
= tevent_req_callback_data(
328 subreq
, struct tevent_req
);
332 written
= writev_recv(subreq
, &err
);
335 tevent_req_error(req
, err
);
338 tevent_req_done(req
);
341 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
344 if (tevent_req_is_unix_error(req
, &err
)) {
350 struct ctdb_msg_channel
{
351 struct ctdb_conn
*conn
;
354 struct ctdb_msg_channel_init_state
{
355 struct tevent_context
*ev
;
356 struct ctdb_conn
*conn
;
358 struct ctdb_msg_channel
*channel
;
361 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
);
362 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
);
364 struct tevent_req
*ctdb_msg_channel_init_send(
365 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
366 const char *sock
, uint64_t srvid
)
368 struct tevent_req
*req
, *subreq
;
369 struct ctdb_msg_channel_init_state
*state
;
371 req
= tevent_req_create(mem_ctx
, &state
,
372 struct ctdb_msg_channel_init_state
);
377 state
->srvid
= srvid
;
379 subreq
= ctdb_conn_init_send(state
, ev
, sock
);
380 if (tevent_req_nomem(subreq
, req
)) {
381 return tevent_req_post(req
, ev
);
383 tevent_req_set_callback(subreq
, ctdb_msg_channel_init_connected
, req
);
387 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
)
389 struct tevent_req
*req
= tevent_req_callback_data(
390 subreq
, struct tevent_req
);
391 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
392 req
, struct ctdb_msg_channel_init_state
);
395 ret
= ctdb_conn_init_recv(subreq
, state
, &state
->conn
);
397 if (tevent_req_error(req
, ret
)) {
400 subreq
= ctdb_conn_control_send(state
, state
->ev
, state
->conn
,
402 CTDB_CONTROL_REGISTER_SRVID
,
403 state
->srvid
, 0, NULL
, 0);
404 if (tevent_req_nomem(subreq
, req
)) {
407 tevent_req_set_callback(
408 subreq
, ctdb_msg_channel_init_registered_srvid
, req
);
411 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
)
413 struct tevent_req
*req
= tevent_req_callback_data(
414 subreq
, struct tevent_req
);
415 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
416 req
, struct ctdb_msg_channel_init_state
);
417 struct ctdb_reply_control
*reply
;
420 ret
= ctdb_conn_control_recv(subreq
, talloc_tos(), &reply
);
422 if (tevent_req_error(req
, ret
)) {
425 if (reply
->status
!= 0) {
426 tevent_req_error(req
, EIO
);
429 state
->channel
= talloc(state
, struct ctdb_msg_channel
);
430 if (tevent_req_nomem(state
->channel
, req
)) {
433 state
->channel
->conn
= talloc_move(state
->channel
, &state
->conn
);
434 tevent_req_done(req
);
437 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
438 struct ctdb_msg_channel
**pchannel
)
440 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
441 req
, struct ctdb_msg_channel_init_state
);
444 if (tevent_req_is_unix_error(req
, &err
)) {
447 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
451 struct ctdb_msg_read_state
{
456 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
);
458 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
459 struct tevent_context
*ev
,
460 struct ctdb_msg_channel
*channel
)
462 struct tevent_req
*req
, *subreq
;
463 struct ctdb_msg_read_state
*state
;
465 req
= tevent_req_create(mem_ctx
, &state
,
466 struct ctdb_msg_read_state
);
470 subreq
= read_packet_send(state
, ev
, channel
->conn
->fd
,
471 sizeof(uint32_t), ctdb_packet_more
, NULL
);
472 if (tevent_req_nomem(subreq
, req
)) {
473 return tevent_req_post(req
, ev
);
475 tevent_req_set_callback(subreq
, ctdb_msg_channel_got_msg
, req
);
479 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
)
481 struct tevent_req
*req
= tevent_req_callback_data(
482 subreq
, struct tevent_req
);
483 struct ctdb_msg_read_state
*state
= tevent_req_data(
484 req
, struct ctdb_msg_read_state
);
489 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
491 tevent_req_error(req
, err
);
494 state
->buflen
= nread
;
496 tevent_req_done(req
);
499 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
500 uint8_t **pmsg
, size_t *pmsg_len
)
502 struct ctdb_msg_read_state
*state
= tevent_req_data(
503 req
, struct ctdb_msg_read_state
);
504 struct ctdb_req_header
*hdr
;
505 struct ctdb_req_message
*msg
;
509 if (tevent_req_is_unix_error(req
, &err
)) {
513 hdr
= (struct ctdb_req_header
*)state
->buf
;
514 if (hdr
->length
!= state
->buflen
) {
515 DEBUG(10, ("Got invalid header length\n"));
518 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
519 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
520 CTDB_REQ_MESSAGE
, (int)hdr
->operation
));
523 if (hdr
->length
< offsetof(struct ctdb_req_message
, data
)) {
524 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr
->length
));
528 msg
= (struct ctdb_req_message
*)hdr
;
530 hdr
->length
- offsetof(struct ctdb_req_message
, data
)) {
531 DEBUG(10, ("Got invalid datalen %d\n", (int)msg
->datalen
));
535 buf
= (uint8_t *)talloc_memdup(mem_ctx
, msg
->data
, msg
->datalen
);
540 *pmsg_len
= msg
->datalen
;
550 static struct tevent_req
*dummy_send(TALLOC_CTX
*mem_ctx
,
551 struct tevent_context
*ev
)
553 struct tevent_req
*req
;
554 struct dummy_state
*state
;
555 req
= tevent_req_create(mem_ctx
, &state
, struct dummy_state
);
559 tevent_req_done(req
);
560 return tevent_req_post(req
, ev
);
563 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
564 struct tevent_context
*ev
,
567 return dummy_send(mem_ctx
, ev
);
570 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
571 struct ctdb_conn
**pconn
)
576 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
577 struct tevent_context
*ev
,
578 struct ctdb_conn
*conn
,
579 uint32_t vnn
, uint64_t srvid
,
580 uint8_t *msg
, size_t msg_len
)
582 return dummy_send(mem_ctx
, ev
);
585 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
590 struct tevent_req
*ctdb_msg_channel_init_send(
591 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
592 const char *sock
, uint64_t srvid
)
594 return dummy_send(mem_ctx
, ev
);
597 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
598 struct ctdb_msg_channel
**pchannel
)
603 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
604 struct tevent_context
*ev
,
605 struct ctdb_msg_channel
*channel
)
607 return dummy_send(mem_ctx
, ev
);
610 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
611 uint8_t **pmsg
, size_t *pmsg_len
)