2 Unix SMB/CIFS implementation.
3 Samba3 ctdb connection handling
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
24 #ifdef CLUSTER_SUPPORT
26 #include "lib/async_req/async_sock.h"
30 struct tevent_queue
*outqueue
;
33 struct ctdb_conn_init_state
{
34 struct sockaddr_un addr
;
35 struct ctdb_conn
*conn
;
38 static void ctdb_conn_init_done(struct tevent_req
*subreq
);
39 static int ctdb_conn_destructor(struct ctdb_conn
*conn
);
41 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
42 struct tevent_context
*ev
,
45 struct tevent_req
*req
, *subreq
;
46 struct ctdb_conn_init_state
*state
;
48 req
= tevent_req_create(mem_ctx
, &state
, struct ctdb_conn_init_state
);
53 if (!lp_clustering()) {
54 tevent_req_error(req
, ENOSYS
);
55 return tevent_req_post(req
, ev
);
58 if (strlen(sock
) >= sizeof(state
->addr
.sun_path
)) {
59 tevent_req_error(req
, ENAMETOOLONG
);
60 return tevent_req_post(req
, ev
);
63 state
->conn
= talloc(state
, struct ctdb_conn
);
64 if (tevent_req_nomem(state
->conn
, req
)) {
65 return tevent_req_post(req
, ev
);
68 state
->conn
->outqueue
= tevent_queue_create(
69 state
->conn
, "ctdb outqueue");
70 if (tevent_req_nomem(state
->conn
->outqueue
, req
)) {
71 return tevent_req_post(req
, ev
);
74 state
->conn
->fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
75 if (state
->conn
->fd
== -1) {
76 tevent_req_error(req
, errno
);
77 return tevent_req_post(req
, ev
);
79 talloc_set_destructor(state
->conn
, ctdb_conn_destructor
);
81 state
->addr
.sun_family
= AF_UNIX
;
82 strncpy(state
->addr
.sun_path
, sock
, sizeof(state
->addr
.sun_path
));
84 subreq
= async_connect_send(state
, ev
, state
->conn
->fd
,
85 (struct sockaddr
*)&state
->addr
,
87 if (tevent_req_nomem(subreq
, req
)) {
88 return tevent_req_post(req
, ev
);
90 tevent_req_set_callback(subreq
, ctdb_conn_init_done
, req
);
94 static int ctdb_conn_destructor(struct ctdb_conn
*c
)
103 static void ctdb_conn_init_done(struct tevent_req
*subreq
)
105 struct tevent_req
*req
= tevent_req_callback_data(
106 subreq
, struct tevent_req
);
109 ret
= async_connect_recv(subreq
, &err
);
112 tevent_req_error(req
, err
);
115 tevent_req_done(req
);
118 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
119 struct ctdb_conn
**pconn
)
121 struct ctdb_conn_init_state
*state
= tevent_req_data(
122 req
, struct ctdb_conn_init_state
);
125 if (tevent_req_is_unix_error(req
, &err
)) {
128 *pconn
= talloc_move(mem_ctx
, &state
->conn
);
133 struct ctdb_conn_control_state
{
134 struct tevent_context
*ev
;
135 struct ctdb_conn
*conn
;
136 struct ctdb_req_control req
;
138 struct ctdb_reply_control
*reply
;
141 static void ctdb_conn_control_written(struct tevent_req
*subreq
);
142 static void ctdb_conn_control_done(struct tevent_req
*subreq
);
143 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
);
145 struct tevent_req
*ctdb_conn_control_send(TALLOC_CTX
*mem_ctx
,
146 struct tevent_context
*ev
,
147 struct ctdb_conn
*conn
,
148 uint32_t vnn
, uint32_t opcode
,
149 uint64_t srvid
, uint32_t flags
,
150 uint8_t *data
, size_t datalen
)
152 struct tevent_req
*req
, *subreq
;
153 struct ctdb_conn_control_state
*state
;
154 struct ctdb_req_header
*hdr
;
156 req
= tevent_req_create(mem_ctx
, &state
,
157 struct ctdb_conn_control_state
);
164 hdr
= &state
->req
.hdr
;
165 hdr
->length
= offsetof(struct ctdb_req_control
, data
) + datalen
;
166 hdr
->ctdb_magic
= CTDB_MAGIC
;
167 hdr
->ctdb_version
= CTDB_VERSION
;
168 hdr
->operation
= CTDB_REQ_CONTROL
;
169 hdr
->reqid
= 1; /* FIXME */
171 state
->req
.opcode
= opcode
;
172 state
->req
.srvid
= srvid
;
173 state
->req
.datalen
= datalen
;
174 state
->req
.flags
= flags
;
176 state
->iov
[0].iov_base
= &state
->req
;
177 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_control
, data
);
178 state
->iov
[1].iov_base
= data
;
179 state
->iov
[1].iov_len
= datalen
;
181 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
183 if (tevent_req_nomem(subreq
, req
)) {
184 return tevent_req_post(req
, ev
);
186 tevent_req_set_callback(subreq
, ctdb_conn_control_written
, req
);
190 static void ctdb_conn_control_written(struct tevent_req
*subreq
)
192 struct tevent_req
*req
= tevent_req_callback_data(
193 subreq
, struct tevent_req
);
194 struct ctdb_conn_control_state
*state
= tevent_req_data(
195 req
, struct ctdb_conn_control_state
);
199 written
= writev_recv(subreq
, &err
);
202 tevent_req_error(req
, err
);
205 subreq
= read_packet_send(
206 state
, state
->ev
, state
->conn
->fd
, sizeof(uint32_t),
207 ctdb_packet_more
, NULL
);
208 if (tevent_req_nomem(subreq
, req
)) {
211 tevent_req_set_callback(subreq
, ctdb_conn_control_done
, req
);
214 static ssize_t
ctdb_packet_more(uint8_t *buf
, size_t buflen
, void *p
)
218 if (buflen
> sizeof(uint32_t)) {
219 /* Been here, done */
222 memcpy(&len
, buf
, sizeof(len
));
223 return (len
- sizeof(uint32_t));
226 static void ctdb_conn_control_done(struct tevent_req
*subreq
)
228 struct tevent_req
*req
= tevent_req_callback_data(
229 subreq
, struct tevent_req
);
230 struct ctdb_conn_control_state
*state
= tevent_req_data(
231 req
, struct ctdb_conn_control_state
);
236 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
239 tevent_req_error(req
, err
);
242 state
->reply
= (struct ctdb_reply_control
*)buf
;
243 tevent_req_done(req
);
246 int ctdb_conn_control_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
247 struct ctdb_reply_control
**preply
)
249 struct ctdb_conn_control_state
*state
= tevent_req_data(
250 req
, struct ctdb_conn_control_state
);
253 if (tevent_req_is_unix_error(req
, &err
)) {
256 if (preply
!= NULL
) {
257 *preply
= talloc_move(mem_ctx
, &state
->reply
);
262 struct ctdb_conn_msg_write_state
{
263 struct ctdb_req_message ctdb_msg
;
267 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
);
269 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
270 struct tevent_context
*ev
,
271 struct ctdb_conn
*conn
,
272 uint32_t vnn
, uint64_t srvid
,
273 uint8_t *msg
, size_t msg_len
)
275 struct tevent_req
*req
, *subreq
;
276 struct ctdb_conn_msg_write_state
*state
;
277 struct ctdb_req_header
*h
;
279 req
= tevent_req_create(mem_ctx
, &state
,
280 struct ctdb_conn_msg_write_state
);
285 h
= &state
->ctdb_msg
.hdr
;
287 h
->length
= offsetof(struct ctdb_req_message
, data
) + msg_len
;
288 h
->ctdb_magic
= CTDB_MAGIC
;
289 h
->ctdb_version
= CTDB_VERSION
;
291 h
->operation
= CTDB_REQ_MESSAGE
;
293 h
->srcnode
= CTDB_CURRENT_NODE
;
295 state
->ctdb_msg
.srvid
= srvid
;
296 state
->ctdb_msg
.datalen
= msg_len
;
298 state
->iov
[0].iov_base
= &state
->ctdb_msg
;
299 state
->iov
[0].iov_len
= offsetof(struct ctdb_req_message
, data
);
300 state
->iov
[1].iov_base
= msg
;
301 state
->iov
[1].iov_len
= msg_len
;
303 subreq
= writev_send(state
, ev
, conn
->outqueue
, conn
->fd
, false,
305 if (tevent_req_nomem(subreq
, req
)) {
306 return tevent_req_post(req
, ev
);
308 tevent_req_set_callback(subreq
, ctdb_conn_msg_write_done
, req
);
312 static void ctdb_conn_msg_write_done(struct tevent_req
*subreq
)
314 struct tevent_req
*req
= tevent_req_callback_data(
315 subreq
, struct tevent_req
);
319 written
= writev_recv(subreq
, &err
);
322 tevent_req_error(req
, err
);
325 tevent_req_done(req
);
328 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
331 if (tevent_req_is_unix_error(req
, &err
)) {
337 struct ctdb_msg_channel
{
338 struct ctdb_conn
*conn
;
341 struct ctdb_msg_channel_init_state
{
342 struct tevent_context
*ev
;
343 struct ctdb_conn
*conn
;
345 struct ctdb_msg_channel
*channel
;
348 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
);
349 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
);
351 struct tevent_req
*ctdb_msg_channel_init_send(
352 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
353 const char *sock
, uint64_t srvid
)
355 struct tevent_req
*req
, *subreq
;
356 struct ctdb_msg_channel_init_state
*state
;
358 req
= tevent_req_create(mem_ctx
, &state
,
359 struct ctdb_msg_channel_init_state
);
364 state
->srvid
= srvid
;
366 subreq
= ctdb_conn_init_send(state
, ev
, sock
);
367 if (tevent_req_nomem(subreq
, req
)) {
368 return tevent_req_post(req
, ev
);
370 tevent_req_set_callback(subreq
, ctdb_msg_channel_init_connected
, req
);
374 static void ctdb_msg_channel_init_connected(struct tevent_req
*subreq
)
376 struct tevent_req
*req
= tevent_req_callback_data(
377 subreq
, struct tevent_req
);
378 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
379 req
, struct ctdb_msg_channel_init_state
);
382 ret
= ctdb_conn_init_recv(subreq
, state
, &state
->conn
);
384 if (tevent_req_error(req
, ret
)) {
387 subreq
= ctdb_conn_control_send(state
, state
->ev
, state
->conn
,
389 CTDB_CONTROL_REGISTER_SRVID
,
390 state
->srvid
, 0, NULL
, 0);
391 if (tevent_req_nomem(subreq
, req
)) {
394 tevent_req_set_callback(
395 subreq
, ctdb_msg_channel_init_registered_srvid
, req
);
398 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req
*subreq
)
400 struct tevent_req
*req
= tevent_req_callback_data(
401 subreq
, struct tevent_req
);
402 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
403 req
, struct ctdb_msg_channel_init_state
);
404 struct ctdb_reply_control
*reply
;
407 ret
= ctdb_conn_control_recv(subreq
, talloc_tos(), &reply
);
409 if (tevent_req_error(req
, ret
)) {
412 if (reply
->status
!= 0) {
413 tevent_req_error(req
, EIO
);
416 state
->channel
= talloc(state
, struct ctdb_msg_channel
);
417 if (tevent_req_nomem(state
->channel
, req
)) {
420 state
->channel
->conn
= talloc_move(state
->channel
, &state
->conn
);
421 tevent_req_done(req
);
424 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
425 struct ctdb_msg_channel
**pchannel
)
427 struct ctdb_msg_channel_init_state
*state
= tevent_req_data(
428 req
, struct ctdb_msg_channel_init_state
);
431 if (tevent_req_is_unix_error(req
, &err
)) {
434 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
438 struct ctdb_msg_read_state
{
443 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
);
445 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
446 struct tevent_context
*ev
,
447 struct ctdb_msg_channel
*channel
)
449 struct tevent_req
*req
, *subreq
;
450 struct ctdb_msg_read_state
*state
;
452 req
= tevent_req_create(mem_ctx
, &state
,
453 struct ctdb_msg_read_state
);
457 subreq
= read_packet_send(state
, ev
, channel
->conn
->fd
,
458 sizeof(uint32_t), ctdb_packet_more
, NULL
);
459 if (tevent_req_nomem(subreq
, req
)) {
460 return tevent_req_post(req
, ev
);
462 tevent_req_set_callback(subreq
, ctdb_msg_channel_got_msg
, req
);
466 static void ctdb_msg_channel_got_msg(struct tevent_req
*subreq
)
468 struct tevent_req
*req
= tevent_req_callback_data(
469 subreq
, struct tevent_req
);
470 struct ctdb_msg_read_state
*state
= tevent_req_data(
471 req
, struct ctdb_msg_read_state
);
476 nread
= read_packet_recv(subreq
, state
, &buf
, &err
);
478 tevent_req_error(req
, err
);
481 state
->buflen
= nread
;
483 tevent_req_done(req
);
486 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
487 uint8_t **pmsg
, size_t *pmsg_len
)
489 struct ctdb_msg_read_state
*state
= tevent_req_data(
490 req
, struct ctdb_msg_read_state
);
491 struct ctdb_req_header
*hdr
;
492 struct ctdb_req_message
*msg
;
496 if (tevent_req_is_unix_error(req
, &err
)) {
500 hdr
= (struct ctdb_req_header
*)state
->buf
;
501 if (hdr
->length
!= state
->buflen
) {
502 DEBUG(10, ("Got invalid header length\n"));
505 if (hdr
->operation
!= CTDB_REQ_MESSAGE
) {
506 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
507 CTDB_REQ_MESSAGE
, (int)hdr
->operation
));
510 if (hdr
->length
< offsetof(struct ctdb_req_message
, data
)) {
511 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr
->length
));
515 msg
= (struct ctdb_req_message
*)hdr
;
517 hdr
->length
- offsetof(struct ctdb_req_message
, data
)) {
518 DEBUG(10, ("Got invalid datalen %d\n", (int)msg
->datalen
));
522 buf
= (uint8_t *)talloc_memdup(mem_ctx
, msg
->data
, msg
->datalen
);
527 *pmsg_len
= msg
->datalen
;
537 static struct tevent_req
*dummy_send(TALLOC_CTX
*mem_ctx
,
538 struct tevent_context
*ev
)
540 struct tevent_req
*req
;
541 struct dummy_state
*state
;
542 req
= tevent_req_create(mem_ctx
, &state
, struct dummy_state
);
546 tevent_req_done(req
);
547 return tevent_req_post(req
, ev
);
550 struct tevent_req
*ctdb_conn_init_send(TALLOC_CTX
*mem_ctx
,
551 struct tevent_context
*ev
,
554 return dummy_send(mem_ctx
, ev
);
557 int ctdb_conn_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
558 struct ctdb_conn
**pconn
)
563 struct tevent_req
*ctdb_conn_msg_write_send(TALLOC_CTX
*mem_ctx
,
564 struct tevent_context
*ev
,
565 struct ctdb_conn
*conn
,
566 uint32_t vnn
, uint64_t srvid
,
567 uint8_t *msg
, size_t msg_len
)
569 return dummy_send(mem_ctx
, ev
);
572 int ctdb_conn_msg_write_recv(struct tevent_req
*req
)
577 struct tevent_req
*ctdb_msg_channel_init_send(
578 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
579 const char *sock
, uint64_t srvid
)
581 return dummy_send(mem_ctx
, ev
);
584 int ctdb_msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
585 struct ctdb_msg_channel
**pchannel
)
590 struct tevent_req
*ctdb_msg_read_send(TALLOC_CTX
*mem_ctx
,
591 struct tevent_context
*ev
,
592 struct ctdb_msg_channel
*channel
)
594 return dummy_send(mem_ctx
, ev
);
597 int ctdb_msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
598 uint8_t **pmsg
, size_t *pmsg_len
)