2 Unix SMB/CIFS implementation.
3 Samba3 ctdb connection handling
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
24 #ifdef CLUSTER_SUPPORT
26 #include "lib/async_req/async_sock.h"
30 struct tevent_queue
*outqueue
;
33 struct ctdb_conn_init_state
{
34 struct sockaddr_un addr
;
35 struct ctdb_conn
*conn
;
39 * use the callbacks of async_connect_send to make sure
40 * we are connecting to CTDB as root
42 static void before_connect_cb(void *private_data
) {
46 static void after_connect_cb(void *private_data
) {
50 static void ctdb_conn_init_done(struct tevent_req
*subreq
);
51 static int ctdb_conn_destructor(struct ctdb_conn
*conn
);
53 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
54 struct tevent_context
*ev
,
57 struct tevent_req
*req
, *subreq
;
58 struct ctdb_conn_init_state
*state
;
60 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_conn_init_state
);
65 if (!lp_clustering()) {
66 tevent_req_error(req
, ENOSYS
);
67 return tevent_req_post(req
, ev
);
70 if (strlen(sock
) >= sizeof(state
->addr
.sun_path
)) {
71 tevent_req_error(req
, ENAMETOOLONG
);
72 return tevent_req_post(req
, ev
);
75 state
->conn
= talloc(state
, struct ctdb_conn
);
76 if (tevent_req_nomem(state
->conn
, req
)) {
77 return tevent_req_post(req
, ev
);
80 state
->conn
->outqueue
= tevent_queue_create(
81 state
->conn
, "ctdb outqueue");
82 if (tevent_req_nomem(state
->conn
->outqueue
, req
)) {
83 return tevent_req_post(req
, ev
);
86 state
->conn
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
87 if (state
->conn
->fd
== -1) {
88 tevent_req_error(req
, errno
);
89 return tevent_req_post(req
, ev
);
91 talloc_set_destructor(state
->conn
, ctdb_conn_destructor
);
93 state
->addr
.sun_family
= AF_UNIX
;
94 strncpy(state
->addr
.sun_path
, sock
, sizeof(state
->addr
.sun_path
));
96 subreq
= async_connect_send(state
, ev
, state
->conn
->fd
,
97 (struct sockaddr
*)&state
->addr
,
98 sizeof(state
->addr
), before_connect_cb
,
99 after_connect_cb
, NULL
);
100 if (tevent_req_nomem(subreq
, req
)) {
101 return tevent_req_post(req
, ev
);
103 tevent_req_set_callback(subreq
, ctdb_conn_init_done
, req
);
107 static int ctdb_conn_destructor(struct ctdb_conn
*c
)
116 static void ctdb_conn_init_done(struct tevent_req
*subreq
)
118 struct tevent_req
*req
= tevent_req_callback_data(
119 subreq
, struct tevent_req
);
122 ret
= async_connect_recv(subreq
, &err
);
125 tevent_req_error(req
, err
);
128 tevent_req_done(req
);
131 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
132 struct ctdb_conn
**pconn
)
134 struct ctdb_conn_init_state
*state
= tevent_req_data(
135 req
, struct ctdb_conn_init_state
);
138 if (tevent_req_is_unix_error(req
, &err
)) {
141 *pconn
= talloc_move(mem_ctx
, &state
->conn
);
146 struct ctdb_conn_control_state
{
147 struct tevent_context
*ev
;
148 struct ctdb_conn
*conn
;
149 struct ctdb_req_control req
;
151 struct ctdb_reply_control
*reply
;
154 static void ctdb_conn_control_written(struct tevent_req
*subreq
);
155 static void ctdb_conn_control_done(struct tevent_req
*subreq
);
156 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
);
158 struct tevent_req
*ctdb_conn_control_send(TALLOC_CTX
*mem_ctx
,
159 struct tevent_context
*ev
,
160 struct ctdb_conn
*conn
,
161 uint32_t vnn
, uint32_t opcode
,
162 uint64_t srvid
, uint32_t flags
,
163 uint8_t *data
, size_t datalen
)
165 struct tevent_req
*req
, *subreq
;
166 struct ctdb_conn_control_state
*state
;
167 struct ctdb_req_header
*hdr
;
169 req
= tevent_req_create(mem_ctx
, &state
,
170 struct ctdb_conn_control_state
);
177 hdr
= &state
->req
.hdr
;
178 hdr
->length
= offsetof(struct ctdb_req_control
, data
) + datalen
;
179 hdr
->ctdb_magic
= CTDB_MAGIC
;
180 hdr
->ctdb_version
= CTDB_VERSION
;
181 hdr
->operation
= CTDB_REQ_CONTROL
;
182 hdr
->reqid
= 1; /* FIXME */
184 state
->req
.opcode
= opcode
;
185 state
->req
.srvid
= srvid
;
186 state
->req
.datalen
= datalen
;
187 state
->req
.flags
= flags
;
189 state
->iov
[0].iov_base
= &state
->req
;
190 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_control
, data
);
191 state
->iov
[1].iov_base
= data
;
192 state
->iov
[1].iov_len
= datalen
;
194 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
196 if (tevent_req_nomem(subreq
, req
)) {
197 return tevent_req_post(req
, ev
);
199 tevent_req_set_callback(subreq
, ctdb_conn_control_written
, req
);
203 static void ctdb_conn_control_written(struct tevent_req
*subreq
)
205 struct tevent_req
*req
= tevent_req_callback_data(
206 subreq
, struct tevent_req
);
207 struct ctdb_conn_control_state
*state
= tevent_req_data(
208 req
, struct ctdb_conn_control_state
);
212 written
= writev_recv(subreq
, &err
);
215 tevent_req_error(req
, err
);
218 subreq
= read_packet_send(
219 state
, state
->ev
, state
->conn
->fd
, sizeof(uint32_t),
220 ctdb_packet_more
, NULL
);
221 if (tevent_req_nomem(subreq
, req
)) {
224 tevent_req_set_callback(subreq
, ctdb_conn_control_done
, req
);
227 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
)
231 if (buflen
> sizeof(uint32_t)) {
232 /* Been here, done */
235 memcpy(&len
, buf
, sizeof(len
));
237 if (len
< sizeof(uint32_t)) {
241 return (len
- sizeof(uint32_t));
244 static void ctdb_conn_control_done(struct tevent_req
*subreq
)
246 struct tevent_req
*req
= tevent_req_callback_data(
247 subreq
, struct tevent_req
);
248 struct ctdb_conn_control_state
*state
= tevent_req_data(
249 req
, struct ctdb_conn_control_state
);
254 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
257 tevent_req_error(req
, err
);
260 state
->reply
= (struct ctdb_reply_control
*)buf
;
261 tevent_req_done(req
);
264 int ctdb_conn_control_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
265 struct ctdb_reply_control
**preply
)
267 struct ctdb_conn_control_state
*state
= tevent_req_data(
268 req
, struct ctdb_conn_control_state
);
271 if (tevent_req_is_unix_error(req
, &err
)) {
274 if (preply
!= NULL
) {
275 *preply
= talloc_move(mem_ctx
, &state
->reply
);
280 struct ctdb_conn_msg_write_state
{
281 struct ctdb_req_message ctdb_msg
;
285 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
);
287 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
288 struct tevent_context
*ev
,
289 struct ctdb_conn
*conn
,
290 uint32_t vnn
, uint64_t srvid
,
291 uint8_t *msg
, size_t msg_len
)
293 struct tevent_req
*req
, *subreq
;
294 struct ctdb_conn_msg_write_state
*state
;
295 struct ctdb_req_header
*h
;
297 req
= tevent_req_create(mem_ctx
, &state
,
298 struct ctdb_conn_msg_write_state
);
303 h
= &state
->ctdb_msg
.hdr
;
305 h
->length
= offsetof(struct ctdb_req_message
, data
) + msg_len
;
306 h
->ctdb_magic
= CTDB_MAGIC
;
307 h
->ctdb_version
= CTDB_VERSION
;
309 h
->operation
= CTDB_REQ_MESSAGE
;
311 h
->srcnode
= CTDB_CURRENT_NODE
;
313 state
->ctdb_msg
.srvid
= srvid
;
314 state
->ctdb_msg
.datalen
= msg_len
;
316 state
->iov
[0].iov_base
= &state
->ctdb_msg
;
317 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_message
, data
);
318 state
->iov
[1].iov_base
= msg
;
319 state
->iov
[1].iov_len
= msg_len
;
321 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
323 if (tevent_req_nomem(subreq
, req
)) {
324 return tevent_req_post(req
, ev
);
326 tevent_req_set_callback(subreq
, ctdb_conn_msg_write_done
, req
);
330 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
)
332 struct tevent_req
*req
= tevent_req_callback_data(
333 subreq
, struct tevent_req
);
337 written
= writev_recv(subreq
, &err
);
340 tevent_req_error(req
, err
);
343 tevent_req_done(req
);
346 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
349 if (tevent_req_is_unix_error(req
, &err
)) {
355 struct ctdb_msg_channel
{
356 struct ctdb_conn
*conn
;
359 struct ctdb_msg_channel_init_state
{
360 struct tevent_context
*ev
;
361 struct ctdb_conn
*conn
;
363 struct ctdb_msg_channel
*channel
;
366 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
);
367 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
);
369 struct tevent_req
*ctdb_msg_channel_init_send(
370 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
371 const char *sock
, uint64_t srvid
)
373 struct tevent_req
*req
, *subreq
;
374 struct ctdb_msg_channel_init_state
*state
;
376 req
= tevent_req_create(mem_ctx
, &state
,
377 struct ctdb_msg_channel_init_state
);
382 state
->srvid
= srvid
;
384 subreq
= ctdb_conn_init_send(state
, ev
, sock
);
385 if (tevent_req_nomem(subreq
, req
)) {
386 return tevent_req_post(req
, ev
);
388 tevent_req_set_callback(subreq
, ctdb_msg_channel_init_connected
, req
);
392 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
)
394 struct tevent_req
*req
= tevent_req_callback_data(
395 subreq
, struct tevent_req
);
396 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
397 req
, struct ctdb_msg_channel_init_state
);
400 ret
= ctdb_conn_init_recv(subreq
, state
, &state
->conn
);
402 if (tevent_req_error(req
, ret
)) {
405 subreq
= ctdb_conn_control_send(state
, state
->ev
, state
->conn
,
407 CTDB_CONTROL_REGISTER_SRVID
,
408 state
->srvid
, 0, NULL
, 0);
409 if (tevent_req_nomem(subreq
, req
)) {
412 tevent_req_set_callback(
413 subreq
, ctdb_msg_channel_init_registered_srvid
, req
);
416 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
)
418 struct tevent_req
*req
= tevent_req_callback_data(
419 subreq
, struct tevent_req
);
420 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
421 req
, struct ctdb_msg_channel_init_state
);
422 struct ctdb_reply_control
*reply
;
425 ret
= ctdb_conn_control_recv(subreq
, talloc_tos(), &reply
);
427 if (tevent_req_error(req
, ret
)) {
430 if (reply
->status
!= 0) {
431 tevent_req_error(req
, EIO
);
434 state
->channel
= talloc(state
, struct ctdb_msg_channel
);
435 if (tevent_req_nomem(state
->channel
, req
)) {
438 state
->channel
->conn
= talloc_move(state
->channel
, &state
->conn
);
439 tevent_req_done(req
);
442 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
443 struct ctdb_msg_channel
**pchannel
)
445 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
446 req
, struct ctdb_msg_channel_init_state
);
449 if (tevent_req_is_unix_error(req
, &err
)) {
452 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
456 struct ctdb_msg_read_state
{
461 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
);
463 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
464 struct tevent_context
*ev
,
465 struct ctdb_msg_channel
*channel
)
467 struct tevent_req
*req
, *subreq
;
468 struct ctdb_msg_read_state
*state
;
470 req
= tevent_req_create(mem_ctx
, &state
,
471 struct ctdb_msg_read_state
);
475 subreq
= read_packet_send(state
, ev
, channel
->conn
->fd
,
476 sizeof(uint32_t), ctdb_packet_more
, NULL
);
477 if (tevent_req_nomem(subreq
, req
)) {
478 return tevent_req_post(req
, ev
);
480 tevent_req_set_callback(subreq
, ctdb_msg_channel_got_msg
, req
);
484 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
)
486 struct tevent_req
*req
= tevent_req_callback_data(
487 subreq
, struct tevent_req
);
488 struct ctdb_msg_read_state
*state
= tevent_req_data(
489 req
, struct ctdb_msg_read_state
);
494 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
496 tevent_req_error(req
, err
);
499 state
->buflen
= nread
;
501 tevent_req_done(req
);
504 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
505 uint8_t **pmsg
, size_t *pmsg_len
)
507 struct ctdb_msg_read_state
*state
= tevent_req_data(
508 req
, struct ctdb_msg_read_state
);
509 struct ctdb_req_header
*hdr
;
510 struct ctdb_req_message
*msg
;
514 if (tevent_req_is_unix_error(req
, &err
)) {
518 hdr
= (struct ctdb_req_header
*)state
->buf
;
519 if (hdr
->length
!= state
->buflen
) {
520 DEBUG(10, ("Got invalid header length\n"));
523 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
524 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
525 CTDB_REQ_MESSAGE
, (int)hdr
->operation
));
528 if (hdr
->length
< offsetof(struct ctdb_req_message
, data
)) {
529 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr
->length
));
533 msg
= (struct ctdb_req_message
*)hdr
;
535 hdr
->length
- offsetof(struct ctdb_req_message
, data
)) {
536 DEBUG(10, ("Got invalid datalen %d\n", (int)msg
->datalen
));
540 buf
= (uint8_t *)talloc_memdup(mem_ctx
, msg
->data
, msg
->datalen
);
545 *pmsg_len
= msg
->datalen
;
555 static struct tevent_req
*dummy_send(TALLOC_CTX
*mem_ctx
,
556 struct tevent_context
*ev
)
558 struct tevent_req
*req
;
559 struct dummy_state
*state
;
560 req
= tevent_req_create(mem_ctx
, &state
, struct dummy_state
);
564 tevent_req_done(req
);
565 return tevent_req_post(req
, ev
);
568 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
569 struct tevent_context
*ev
,
572 return dummy_send(mem_ctx
, ev
);
575 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
576 struct ctdb_conn
**pconn
)
581 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
582 struct tevent_context
*ev
,
583 struct ctdb_conn
*conn
,
584 uint32_t vnn
, uint64_t srvid
,
585 uint8_t *msg
, size_t msg_len
)
587 return dummy_send(mem_ctx
, ev
);
590 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
595 struct tevent_req
*ctdb_msg_channel_init_send(
596 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
597 const char *sock
, uint64_t srvid
)
599 return dummy_send(mem_ctx
, ev
);
602 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
603 struct ctdb_msg_channel
**pchannel
)
608 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
609 struct tevent_context
*ev
,
610 struct ctdb_msg_channel
*channel
)
612 return dummy_send(mem_ctx
, ev
);
615 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
616 uint8_t **pmsg
, size_t *pmsg_len
)