2 Unix SMB/CIFS implementation.
3 Samba3 ctdb connection handling
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
26 #include <ctdb_protocol.h>
28 #include "lib/async_req/async_sock.h"
32 struct tevent_queue
*outqueue
;
35 struct ctdb_conn_init_state
{
36 struct sockaddr_un addr
;
37 struct ctdb_conn
*conn
;
41 * use the callbacks of async_connect_send to make sure
42 * we are connecting to CTDB as root
44 static void before_connect_cb(void *private_data
) {
48 static void after_connect_cb(void *private_data
) {
52 static void ctdb_conn_init_done(struct tevent_req
*subreq
);
53 static int ctdb_conn_destructor(struct ctdb_conn
*conn
);
55 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
56 struct tevent_context
*ev
,
59 struct tevent_req
*req
, *subreq
;
60 struct ctdb_conn_init_state
*state
;
62 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_conn_init_state
);
67 if (!lp_clustering()) {
68 tevent_req_error(req
, ENOSYS
);
69 return tevent_req_post(req
, ev
);
72 if (strlen(sock
) >= sizeof(state
->addr
.sun_path
)) {
73 tevent_req_error(req
, ENAMETOOLONG
);
74 return tevent_req_post(req
, ev
);
77 state
->conn
= talloc(state
, struct ctdb_conn
);
78 if (tevent_req_nomem(state
->conn
, req
)) {
79 return tevent_req_post(req
, ev
);
82 state
->conn
->outqueue
= tevent_queue_create(
83 state
->conn
, "ctdb outqueue");
84 if (tevent_req_nomem(state
->conn
->outqueue
, req
)) {
85 return tevent_req_post(req
, ev
);
88 state
->conn
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
89 if (state
->conn
->fd
== -1) {
90 tevent_req_error(req
, errno
);
91 return tevent_req_post(req
, ev
);
93 talloc_set_destructor(state
->conn
, ctdb_conn_destructor
);
95 state
->addr
.sun_family
= AF_UNIX
;
96 strncpy(state
->addr
.sun_path
, sock
, sizeof(state
->addr
.sun_path
));
98 subreq
= async_connect_send(state
, ev
, state
->conn
->fd
,
99 (struct sockaddr
*)&state
->addr
,
100 sizeof(state
->addr
), before_connect_cb
,
101 after_connect_cb
, NULL
);
102 if (tevent_req_nomem(subreq
, req
)) {
103 return tevent_req_post(req
, ev
);
105 tevent_req_set_callback(subreq
, ctdb_conn_init_done
, req
);
109 static int ctdb_conn_destructor(struct ctdb_conn
*c
)
118 static void ctdb_conn_init_done(struct tevent_req
*subreq
)
120 struct tevent_req
*req
= tevent_req_callback_data(
121 subreq
, struct tevent_req
);
124 ret
= async_connect_recv(subreq
, &err
);
127 tevent_req_error(req
, err
);
130 tevent_req_done(req
);
133 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
134 struct ctdb_conn
**pconn
)
136 struct ctdb_conn_init_state
*state
= tevent_req_data(
137 req
, struct ctdb_conn_init_state
);
140 if (tevent_req_is_unix_error(req
, &err
)) {
143 *pconn
= talloc_move(mem_ctx
, &state
->conn
);
148 struct ctdb_conn_control_state
{
149 struct tevent_context
*ev
;
150 struct ctdb_conn
*conn
;
151 struct ctdb_req_control req
;
153 struct ctdb_reply_control
*reply
;
156 static void ctdb_conn_control_written(struct tevent_req
*subreq
);
157 static void ctdb_conn_control_done(struct tevent_req
*subreq
);
158 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
);
160 struct tevent_req
*ctdb_conn_control_send(TALLOC_CTX
*mem_ctx
,
161 struct tevent_context
*ev
,
162 struct ctdb_conn
*conn
,
163 uint32_t vnn
, uint32_t opcode
,
164 uint64_t srvid
, uint32_t flags
,
165 uint8_t *data
, size_t datalen
)
167 struct tevent_req
*req
, *subreq
;
168 struct ctdb_conn_control_state
*state
;
169 struct ctdb_req_header
*hdr
;
171 req
= tevent_req_create(mem_ctx
, &state
,
172 struct ctdb_conn_control_state
);
179 hdr
= &state
->req
.hdr
;
180 hdr
->length
= offsetof(struct ctdb_req_control
, data
) + datalen
;
181 hdr
->ctdb_magic
= CTDB_MAGIC
;
182 hdr
->ctdb_version
= CTDB_PROTOCOL
;
183 hdr
->operation
= CTDB_REQ_CONTROL
;
184 hdr
->reqid
= 1; /* FIXME */
186 state
->req
.opcode
= opcode
;
187 state
->req
.srvid
= srvid
;
188 state
->req
.datalen
= datalen
;
189 state
->req
.flags
= flags
;
191 state
->iov
[0].iov_base
= &state
->req
;
192 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_control
, data
);
193 state
->iov
[1].iov_base
= data
;
194 state
->iov
[1].iov_len
= datalen
;
196 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
198 if (tevent_req_nomem(subreq
, req
)) {
199 return tevent_req_post(req
, ev
);
201 tevent_req_set_callback(subreq
, ctdb_conn_control_written
, req
);
205 static void ctdb_conn_control_written(struct tevent_req
*subreq
)
207 struct tevent_req
*req
= tevent_req_callback_data(
208 subreq
, struct tevent_req
);
209 struct ctdb_conn_control_state
*state
= tevent_req_data(
210 req
, struct ctdb_conn_control_state
);
214 written
= writev_recv(subreq
, &err
);
217 tevent_req_error(req
, err
);
220 subreq
= read_packet_send(
221 state
, state
->ev
, state
->conn
->fd
, sizeof(uint32_t),
222 ctdb_packet_more
, NULL
);
223 if (tevent_req_nomem(subreq
, req
)) {
226 tevent_req_set_callback(subreq
, ctdb_conn_control_done
, req
);
229 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
)
233 if (buflen
> sizeof(uint32_t)) {
234 /* Been here, done */
237 memcpy(&len
, buf
, sizeof(len
));
239 if (len
< sizeof(uint32_t)) {
243 return (len
- sizeof(uint32_t));
246 static void ctdb_conn_control_done(struct tevent_req
*subreq
)
248 struct tevent_req
*req
= tevent_req_callback_data(
249 subreq
, struct tevent_req
);
250 struct ctdb_conn_control_state
*state
= tevent_req_data(
251 req
, struct ctdb_conn_control_state
);
256 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
259 tevent_req_error(req
, err
);
262 state
->reply
= (struct ctdb_reply_control
*)buf
;
263 tevent_req_done(req
);
266 int ctdb_conn_control_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
267 struct ctdb_reply_control
**preply
)
269 struct ctdb_conn_control_state
*state
= tevent_req_data(
270 req
, struct ctdb_conn_control_state
);
273 if (tevent_req_is_unix_error(req
, &err
)) {
276 if (preply
!= NULL
) {
277 *preply
= talloc_move(mem_ctx
, &state
->reply
);
282 struct ctdb_conn_msg_write_state
{
283 struct ctdb_req_message ctdb_msg
;
287 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
);
289 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
290 struct tevent_context
*ev
,
291 struct ctdb_conn
*conn
,
292 uint32_t vnn
, uint64_t srvid
,
293 uint8_t *msg
, size_t msg_len
)
295 struct tevent_req
*req
, *subreq
;
296 struct ctdb_conn_msg_write_state
*state
;
297 struct ctdb_req_header
*h
;
299 req
= tevent_req_create(mem_ctx
, &state
,
300 struct ctdb_conn_msg_write_state
);
305 h
= &state
->ctdb_msg
.hdr
;
307 h
->length
= offsetof(struct ctdb_req_message
, data
) + msg_len
;
308 h
->ctdb_magic
= CTDB_MAGIC
;
309 h
->ctdb_version
= CTDB_PROTOCOL
;
311 h
->operation
= CTDB_REQ_MESSAGE
;
313 h
->srcnode
= CTDB_CURRENT_NODE
;
315 state
->ctdb_msg
.srvid
= srvid
;
316 state
->ctdb_msg
.datalen
= msg_len
;
318 state
->iov
[0].iov_base
= &state
->ctdb_msg
;
319 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_message
, data
);
320 state
->iov
[1].iov_base
= msg
;
321 state
->iov
[1].iov_len
= msg_len
;
323 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
325 if (tevent_req_nomem(subreq
, req
)) {
326 return tevent_req_post(req
, ev
);
328 tevent_req_set_callback(subreq
, ctdb_conn_msg_write_done
, req
);
332 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
)
334 struct tevent_req
*req
= tevent_req_callback_data(
335 subreq
, struct tevent_req
);
339 written
= writev_recv(subreq
, &err
);
342 tevent_req_error(req
, err
);
345 tevent_req_done(req
);
348 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
350 return tevent_req_simple_recv_unix(req
);
353 struct ctdb_msg_channel
{
354 struct ctdb_conn
*conn
;
357 struct ctdb_msg_channel_init_state
{
358 struct tevent_context
*ev
;
359 struct ctdb_conn
*conn
;
361 struct ctdb_msg_channel
*channel
;
364 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
);
365 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
);
367 struct tevent_req
*ctdb_msg_channel_init_send(
368 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
369 const char *sock
, uint64_t srvid
)
371 struct tevent_req
*req
, *subreq
;
372 struct ctdb_msg_channel_init_state
*state
;
374 req
= tevent_req_create(mem_ctx
, &state
,
375 struct ctdb_msg_channel_init_state
);
380 state
->srvid
= srvid
;
382 subreq
= ctdb_conn_init_send(state
, ev
, sock
);
383 if (tevent_req_nomem(subreq
, req
)) {
384 return tevent_req_post(req
, ev
);
386 tevent_req_set_callback(subreq
, ctdb_msg_channel_init_connected
, req
);
390 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
)
392 struct tevent_req
*req
= tevent_req_callback_data(
393 subreq
, struct tevent_req
);
394 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
395 req
, struct ctdb_msg_channel_init_state
);
398 ret
= ctdb_conn_init_recv(subreq
, state
, &state
->conn
);
400 if (tevent_req_error(req
, ret
)) {
403 subreq
= ctdb_conn_control_send(state
, state
->ev
, state
->conn
,
405 CTDB_CONTROL_REGISTER_SRVID
,
406 state
->srvid
, 0, NULL
, 0);
407 if (tevent_req_nomem(subreq
, req
)) {
410 tevent_req_set_callback(
411 subreq
, ctdb_msg_channel_init_registered_srvid
, req
);
414 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
)
416 struct tevent_req
*req
= tevent_req_callback_data(
417 subreq
, struct tevent_req
);
418 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
419 req
, struct ctdb_msg_channel_init_state
);
420 struct ctdb_reply_control
*reply
;
423 ret
= ctdb_conn_control_recv(subreq
, talloc_tos(), &reply
);
425 if (tevent_req_error(req
, ret
)) {
428 if (reply
->status
!= 0) {
429 tevent_req_error(req
, EIO
);
432 state
->channel
= talloc(state
, struct ctdb_msg_channel
);
433 if (tevent_req_nomem(state
->channel
, req
)) {
436 state
->channel
->conn
= talloc_move(state
->channel
, &state
->conn
);
437 tevent_req_done(req
);
440 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
441 struct ctdb_msg_channel
**pchannel
)
443 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
444 req
, struct ctdb_msg_channel_init_state
);
447 if (tevent_req_is_unix_error(req
, &err
)) {
450 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
454 struct ctdb_msg_read_state
{
459 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
);
461 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
462 struct tevent_context
*ev
,
463 struct ctdb_msg_channel
*channel
)
465 struct tevent_req
*req
, *subreq
;
466 struct ctdb_msg_read_state
*state
;
468 req
= tevent_req_create(mem_ctx
, &state
,
469 struct ctdb_msg_read_state
);
473 subreq
= read_packet_send(state
, ev
, channel
->conn
->fd
,
474 sizeof(uint32_t), ctdb_packet_more
, NULL
);
475 if (tevent_req_nomem(subreq
, req
)) {
476 return tevent_req_post(req
, ev
);
478 tevent_req_set_callback(subreq
, ctdb_msg_channel_got_msg
, req
);
482 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
)
484 struct tevent_req
*req
= tevent_req_callback_data(
485 subreq
, struct tevent_req
);
486 struct ctdb_msg_read_state
*state
= tevent_req_data(
487 req
, struct ctdb_msg_read_state
);
492 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
494 tevent_req_error(req
, err
);
497 state
->buflen
= nread
;
499 tevent_req_done(req
);
502 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
503 uint8_t **pmsg
, size_t *pmsg_len
)
505 struct ctdb_msg_read_state
*state
= tevent_req_data(
506 req
, struct ctdb_msg_read_state
);
507 struct ctdb_req_header
*hdr
;
508 struct ctdb_req_message
*msg
;
512 if (tevent_req_is_unix_error(req
, &err
)) {
516 hdr
= (struct ctdb_req_header
*)state
->buf
;
517 if (hdr
->length
!= state
->buflen
) {
518 DEBUG(10, ("Got invalid header length\n"));
521 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
522 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
523 CTDB_REQ_MESSAGE
, (int)hdr
->operation
));
526 if (hdr
->length
< offsetof(struct ctdb_req_message
, data
)) {
527 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr
->length
));
531 msg
= (struct ctdb_req_message
*)hdr
;
533 hdr
->length
- offsetof(struct ctdb_req_message
, data
)) {
534 DEBUG(10, ("Got invalid datalen %d\n", (int)msg
->datalen
));
538 buf
= (uint8_t *)talloc_memdup(mem_ctx
, msg
->data
, msg
->datalen
);
543 *pmsg_len
= msg
->datalen
;