2 Unix SMB/CIFS implementation.
3 Samba3 ctdb connection handling
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
26 #ifdef HAVE_CTDB_PROTOCOL_H
27 #include <ctdb_protocol.h>
29 #include <ctdb_private.h>
32 #include "lib/async_req/async_sock.h"
36 struct tevent_queue
*outqueue
;
39 struct ctdb_conn_init_state
{
40 struct sockaddr_un addr
;
41 struct ctdb_conn
*conn
;
45 * use the callbacks of async_connect_send to make sure
46 * we are connecting to CTDB as root
48 static void before_connect_cb(void *private_data
) {
52 static void after_connect_cb(void *private_data
) {
56 static void ctdb_conn_init_done(struct tevent_req
*subreq
);
57 static int ctdb_conn_destructor(struct ctdb_conn
*conn
);
59 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
60 struct tevent_context
*ev
,
63 struct tevent_req
*req
, *subreq
;
64 struct ctdb_conn_init_state
*state
;
66 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_conn_init_state
);
71 if (!lp_clustering()) {
72 tevent_req_error(req
, ENOSYS
);
73 return tevent_req_post(req
, ev
);
76 if (strlen(sock
) >= sizeof(state
->addr
.sun_path
)) {
77 tevent_req_error(req
, ENAMETOOLONG
);
78 return tevent_req_post(req
, ev
);
81 state
->conn
= talloc(state
, struct ctdb_conn
);
82 if (tevent_req_nomem(state
->conn
, req
)) {
83 return tevent_req_post(req
, ev
);
86 state
->conn
->outqueue
= tevent_queue_create(
87 state
->conn
, "ctdb outqueue");
88 if (tevent_req_nomem(state
->conn
->outqueue
, req
)) {
89 return tevent_req_post(req
, ev
);
92 state
->conn
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
93 if (state
->conn
->fd
== -1) {
94 tevent_req_error(req
, errno
);
95 return tevent_req_post(req
, ev
);
97 talloc_set_destructor(state
->conn
, ctdb_conn_destructor
);
99 state
->addr
.sun_family
= AF_UNIX
;
100 strncpy(state
->addr
.sun_path
, sock
, sizeof(state
->addr
.sun_path
));
102 subreq
= async_connect_send(state
, ev
, state
->conn
->fd
,
103 (struct sockaddr
*)&state
->addr
,
104 sizeof(state
->addr
), before_connect_cb
,
105 after_connect_cb
, NULL
);
106 if (tevent_req_nomem(subreq
, req
)) {
107 return tevent_req_post(req
, ev
);
109 tevent_req_set_callback(subreq
, ctdb_conn_init_done
, req
);
113 static int ctdb_conn_destructor(struct ctdb_conn
*c
)
122 static void ctdb_conn_init_done(struct tevent_req
*subreq
)
124 struct tevent_req
*req
= tevent_req_callback_data(
125 subreq
, struct tevent_req
);
128 ret
= async_connect_recv(subreq
, &err
);
131 tevent_req_error(req
, err
);
134 tevent_req_done(req
);
137 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
138 struct ctdb_conn
**pconn
)
140 struct ctdb_conn_init_state
*state
= tevent_req_data(
141 req
, struct ctdb_conn_init_state
);
144 if (tevent_req_is_unix_error(req
, &err
)) {
147 *pconn
= talloc_move(mem_ctx
, &state
->conn
);
152 struct ctdb_conn_control_state
{
153 struct tevent_context
*ev
;
154 struct ctdb_conn
*conn
;
155 struct ctdb_req_control req
;
157 struct ctdb_reply_control
*reply
;
160 static void ctdb_conn_control_written(struct tevent_req
*subreq
);
161 static void ctdb_conn_control_done(struct tevent_req
*subreq
);
162 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
);
164 struct tevent_req
*ctdb_conn_control_send(TALLOC_CTX
*mem_ctx
,
165 struct tevent_context
*ev
,
166 struct ctdb_conn
*conn
,
167 uint32_t vnn
, uint32_t opcode
,
168 uint64_t srvid
, uint32_t flags
,
169 uint8_t *data
, size_t datalen
)
171 struct tevent_req
*req
, *subreq
;
172 struct ctdb_conn_control_state
*state
;
173 struct ctdb_req_header
*hdr
;
175 req
= tevent_req_create(mem_ctx
, &state
,
176 struct ctdb_conn_control_state
);
183 hdr
= &state
->req
.hdr
;
184 hdr
->length
= offsetof(struct ctdb_req_control
, data
) + datalen
;
185 hdr
->ctdb_magic
= CTDB_MAGIC
;
186 hdr
->ctdb_version
= CTDB_VERSION
;
187 hdr
->operation
= CTDB_REQ_CONTROL
;
188 hdr
->reqid
= 1; /* FIXME */
190 state
->req
.opcode
= opcode
;
191 state
->req
.srvid
= srvid
;
192 state
->req
.datalen
= datalen
;
193 state
->req
.flags
= flags
;
195 state
->iov
[0].iov_base
= &state
->req
;
196 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_control
, data
);
197 state
->iov
[1].iov_base
= data
;
198 state
->iov
[1].iov_len
= datalen
;
200 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
202 if (tevent_req_nomem(subreq
, req
)) {
203 return tevent_req_post(req
, ev
);
205 tevent_req_set_callback(subreq
, ctdb_conn_control_written
, req
);
209 static void ctdb_conn_control_written(struct tevent_req
*subreq
)
211 struct tevent_req
*req
= tevent_req_callback_data(
212 subreq
, struct tevent_req
);
213 struct ctdb_conn_control_state
*state
= tevent_req_data(
214 req
, struct ctdb_conn_control_state
);
218 written
= writev_recv(subreq
, &err
);
221 tevent_req_error(req
, err
);
224 subreq
= read_packet_send(
225 state
, state
->ev
, state
->conn
->fd
, sizeof(uint32_t),
226 ctdb_packet_more
, NULL
);
227 if (tevent_req_nomem(subreq
, req
)) {
230 tevent_req_set_callback(subreq
, ctdb_conn_control_done
, req
);
233 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
)
237 if (buflen
> sizeof(uint32_t)) {
238 /* Been here, done */
241 memcpy(&len
, buf
, sizeof(len
));
243 if (len
< sizeof(uint32_t)) {
247 return (len
- sizeof(uint32_t));
250 static void ctdb_conn_control_done(struct tevent_req
*subreq
)
252 struct tevent_req
*req
= tevent_req_callback_data(
253 subreq
, struct tevent_req
);
254 struct ctdb_conn_control_state
*state
= tevent_req_data(
255 req
, struct ctdb_conn_control_state
);
260 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
263 tevent_req_error(req
, err
);
266 state
->reply
= (struct ctdb_reply_control
*)buf
;
267 tevent_req_done(req
);
270 int ctdb_conn_control_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
271 struct ctdb_reply_control
**preply
)
273 struct ctdb_conn_control_state
*state
= tevent_req_data(
274 req
, struct ctdb_conn_control_state
);
277 if (tevent_req_is_unix_error(req
, &err
)) {
280 if (preply
!= NULL
) {
281 *preply
= talloc_move(mem_ctx
, &state
->reply
);
286 struct ctdb_conn_msg_write_state
{
287 struct ctdb_req_message ctdb_msg
;
291 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
);
293 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
294 struct tevent_context
*ev
,
295 struct ctdb_conn
*conn
,
296 uint32_t vnn
, uint64_t srvid
,
297 uint8_t *msg
, size_t msg_len
)
299 struct tevent_req
*req
, *subreq
;
300 struct ctdb_conn_msg_write_state
*state
;
301 struct ctdb_req_header
*h
;
303 req
= tevent_req_create(mem_ctx
, &state
,
304 struct ctdb_conn_msg_write_state
);
309 h
= &state
->ctdb_msg
.hdr
;
311 h
->length
= offsetof(struct ctdb_req_message
, data
) + msg_len
;
312 h
->ctdb_magic
= CTDB_MAGIC
;
313 h
->ctdb_version
= CTDB_VERSION
;
315 h
->operation
= CTDB_REQ_MESSAGE
;
317 h
->srcnode
= CTDB_CURRENT_NODE
;
319 state
->ctdb_msg
.srvid
= srvid
;
320 state
->ctdb_msg
.datalen
= msg_len
;
322 state
->iov
[0].iov_base
= &state
->ctdb_msg
;
323 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_message
, data
);
324 state
->iov
[1].iov_base
= msg
;
325 state
->iov
[1].iov_len
= msg_len
;
327 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
329 if (tevent_req_nomem(subreq
, req
)) {
330 return tevent_req_post(req
, ev
);
332 tevent_req_set_callback(subreq
, ctdb_conn_msg_write_done
, req
);
336 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
)
338 struct tevent_req
*req
= tevent_req_callback_data(
339 subreq
, struct tevent_req
);
343 written
= writev_recv(subreq
, &err
);
346 tevent_req_error(req
, err
);
349 tevent_req_done(req
);
352 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
355 if (tevent_req_is_unix_error(req
, &err
)) {
361 struct ctdb_msg_channel
{
362 struct ctdb_conn
*conn
;
365 struct ctdb_msg_channel_init_state
{
366 struct tevent_context
*ev
;
367 struct ctdb_conn
*conn
;
369 struct ctdb_msg_channel
*channel
;
372 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
);
373 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
);
375 struct tevent_req
*ctdb_msg_channel_init_send(
376 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
377 const char *sock
, uint64_t srvid
)
379 struct tevent_req
*req
, *subreq
;
380 struct ctdb_msg_channel_init_state
*state
;
382 req
= tevent_req_create(mem_ctx
, &state
,
383 struct ctdb_msg_channel_init_state
);
388 state
->srvid
= srvid
;
390 subreq
= ctdb_conn_init_send(state
, ev
, sock
);
391 if (tevent_req_nomem(subreq
, req
)) {
392 return tevent_req_post(req
, ev
);
394 tevent_req_set_callback(subreq
, ctdb_msg_channel_init_connected
, req
);
398 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
)
400 struct tevent_req
*req
= tevent_req_callback_data(
401 subreq
, struct tevent_req
);
402 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
403 req
, struct ctdb_msg_channel_init_state
);
406 ret
= ctdb_conn_init_recv(subreq
, state
, &state
->conn
);
408 if (tevent_req_error(req
, ret
)) {
411 subreq
= ctdb_conn_control_send(state
, state
->ev
, state
->conn
,
413 CTDB_CONTROL_REGISTER_SRVID
,
414 state
->srvid
, 0, NULL
, 0);
415 if (tevent_req_nomem(subreq
, req
)) {
418 tevent_req_set_callback(
419 subreq
, ctdb_msg_channel_init_registered_srvid
, req
);
422 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
)
424 struct tevent_req
*req
= tevent_req_callback_data(
425 subreq
, struct tevent_req
);
426 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
427 req
, struct ctdb_msg_channel_init_state
);
428 struct ctdb_reply_control
*reply
;
431 ret
= ctdb_conn_control_recv(subreq
, talloc_tos(), &reply
);
433 if (tevent_req_error(req
, ret
)) {
436 if (reply
->status
!= 0) {
437 tevent_req_error(req
, EIO
);
440 state
->channel
= talloc(state
, struct ctdb_msg_channel
);
441 if (tevent_req_nomem(state
->channel
, req
)) {
444 state
->channel
->conn
= talloc_move(state
->channel
, &state
->conn
);
445 tevent_req_done(req
);
448 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
449 struct ctdb_msg_channel
**pchannel
)
451 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
452 req
, struct ctdb_msg_channel_init_state
);
455 if (tevent_req_is_unix_error(req
, &err
)) {
458 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
462 struct ctdb_msg_read_state
{
467 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
);
469 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
470 struct tevent_context
*ev
,
471 struct ctdb_msg_channel
*channel
)
473 struct tevent_req
*req
, *subreq
;
474 struct ctdb_msg_read_state
*state
;
476 req
= tevent_req_create(mem_ctx
, &state
,
477 struct ctdb_msg_read_state
);
481 subreq
= read_packet_send(state
, ev
, channel
->conn
->fd
,
482 sizeof(uint32_t), ctdb_packet_more
, NULL
);
483 if (tevent_req_nomem(subreq
, req
)) {
484 return tevent_req_post(req
, ev
);
486 tevent_req_set_callback(subreq
, ctdb_msg_channel_got_msg
, req
);
490 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
)
492 struct tevent_req
*req
= tevent_req_callback_data(
493 subreq
, struct tevent_req
);
494 struct ctdb_msg_read_state
*state
= tevent_req_data(
495 req
, struct ctdb_msg_read_state
);
500 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
502 tevent_req_error(req
, err
);
505 state
->buflen
= nread
;
507 tevent_req_done(req
);
510 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
511 uint8_t **pmsg
, size_t *pmsg_len
)
513 struct ctdb_msg_read_state
*state
= tevent_req_data(
514 req
, struct ctdb_msg_read_state
);
515 struct ctdb_req_header
*hdr
;
516 struct ctdb_req_message
*msg
;
520 if (tevent_req_is_unix_error(req
, &err
)) {
524 hdr
= (struct ctdb_req_header
*)state
->buf
;
525 if (hdr
->length
!= state
->buflen
) {
526 DEBUG(10, ("Got invalid header length\n"));
529 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
530 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
531 CTDB_REQ_MESSAGE
, (int)hdr
->operation
));
534 if (hdr
->length
< offsetof(struct ctdb_req_message
, data
)) {
535 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr
->length
));
539 msg
= (struct ctdb_req_message
*)hdr
;
541 hdr
->length
- offsetof(struct ctdb_req_message
, data
)) {
542 DEBUG(10, ("Got invalid datalen %d\n", (int)msg
->datalen
));
546 buf
= (uint8_t *)talloc_memdup(mem_ctx
, msg
->data
, msg
->datalen
);
551 *pmsg_len
= msg
->datalen
;