4 Copyright (C) Amitay Isaacs 2016
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
27 #include "lib/util/tevent_unix.h"
29 #include "common/reqid.h"
30 #include "common/srvid.h"
31 #include "common/comm.h"
33 #include "protocol/protocol.h"
34 #include "protocol/protocol_api.h"
36 #include "client/client_private.h"
37 #include "client/client.h"
40 struct ctdb_tunnel_data
{
41 struct ctdb_req_header hdr
;
42 struct ctdb_req_tunnel
*tunnel
;
47 * Tunnel setup and destroy
50 struct ctdb_tunnel_setup_state
{
51 struct ctdb_client_context
*client
;
52 struct ctdb_tunnel_context
*tctx
;
56 static void ctdb_tunnel_setup_register_done(struct tevent_req
*subreq
);
57 static void ctdb_tunnel_handler(uint64_t tunnel_id
, TDB_DATA data
,
60 struct tevent_req
*ctdb_tunnel_setup_send(TALLOC_CTX
*mem_ctx
,
61 struct tevent_context
*ev
,
62 struct ctdb_client_context
*client
,
64 ctdb_tunnel_callback_func_t callback
,
67 struct tevent_req
*req
, *subreq
;
68 struct ctdb_tunnel_setup_state
*state
;
69 struct ctdb_tunnel_context
*tctx
;
70 struct ctdb_req_control request
;
73 req
= tevent_req_create(mem_ctx
, &state
,
74 struct ctdb_tunnel_setup_state
);
79 tctx
= talloc_zero(client
, struct ctdb_tunnel_context
);
80 if (tevent_req_nomem(tctx
, req
)) {
81 return tevent_req_post(req
, ev
);
84 tctx
->client
= client
;
85 tctx
->tunnel_id
= tunnel_id
;
86 tctx
->callback
= callback
;
87 tctx
->private_data
= private_data
;
89 state
->client
= client
;
90 state
->tunnel_id
= tunnel_id
;
93 ret
= srvid_exists(client
->tunnels
, tunnel_id
, NULL
);
95 tevent_req_error(req
, EEXIST
);
96 return tevent_req_post(req
, ev
);
99 ctdb_req_control_tunnel_register(&request
, tunnel_id
);
100 subreq
= ctdb_client_control_send(state
, ev
, client
,
101 ctdb_client_pnn(client
),
102 tevent_timeval_zero(),
104 if (tevent_req_nomem(subreq
, req
)) {
105 return tevent_req_post(req
, ev
);
107 tevent_req_set_callback(subreq
, ctdb_tunnel_setup_register_done
, req
);
112 static void ctdb_tunnel_setup_register_done(struct tevent_req
*subreq
)
114 struct tevent_req
*req
= tevent_req_callback_data(
115 subreq
, struct tevent_req
);
116 struct ctdb_tunnel_setup_state
*state
= tevent_req_data(
117 req
, struct ctdb_tunnel_setup_state
);
118 struct ctdb_reply_control
*reply
;
122 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
125 tevent_req_error(req
, ret
);
129 ret
= ctdb_reply_control_tunnel_register(reply
);
132 tevent_req_error(req
, ret
);
136 ret
= srvid_register(state
->client
->tunnels
, state
->client
,
138 ctdb_tunnel_handler
, state
->tctx
);
140 tevent_req_error(req
, ret
);
144 tevent_req_done(req
);
147 static void ctdb_tunnel_handler(uint64_t tunnel_id
, TDB_DATA data
,
150 struct ctdb_tunnel_context
*tctx
= talloc_get_type_abort(
151 private_data
, struct ctdb_tunnel_context
);
152 struct ctdb_tunnel_data
*tunnel_data
;
154 if (tctx
->tunnel_id
!= tunnel_id
) {
158 if (data
.dsize
!= sizeof(struct ctdb_tunnel_data
)) {
162 tunnel_data
= (struct ctdb_tunnel_data
*)data
.dptr
;
164 tctx
->callback(tctx
, tunnel_data
->hdr
.srcnode
, tunnel_data
->reqid
,
165 tunnel_data
->tunnel
->data
.dptr
,
166 tunnel_data
->tunnel
->data
.dsize
, tctx
->private_data
);
169 bool ctdb_tunnel_setup_recv(struct tevent_req
*req
, int *perr
,
170 struct ctdb_tunnel_context
**result
)
172 struct ctdb_tunnel_setup_state
*state
= tevent_req_data(
173 req
, struct ctdb_tunnel_setup_state
);
176 if (tevent_req_is_unix_error(req
, &ret
)) {
183 *result
= state
->tctx
;
187 int ctdb_tunnel_setup(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
188 struct ctdb_client_context
*client
, uint64_t tunnel_id
,
189 ctdb_tunnel_callback_func_t callback
, void *private_data
,
190 struct ctdb_tunnel_context
**result
)
192 struct tevent_req
*req
;
196 req
= ctdb_tunnel_setup_send(mem_ctx
, ev
, client
, tunnel_id
,
197 callback
, private_data
);
202 tevent_req_poll(req
, ev
);
204 status
= ctdb_tunnel_setup_recv(req
, &ret
, result
);
213 struct ctdb_tunnel_destroy_state
{
214 struct ctdb_tunnel_context
*tctx
;
217 static void ctdb_tunnel_destroy_deregister_done(struct tevent_req
*subreq
);
219 struct tevent_req
*ctdb_tunnel_destroy_send(TALLOC_CTX
*mem_ctx
,
220 struct tevent_context
*ev
,
221 struct ctdb_tunnel_context
*tctx
)
223 struct tevent_req
*req
, *subreq
;
224 struct ctdb_tunnel_destroy_state
*state
;
225 struct ctdb_req_control request
;
227 req
= tevent_req_create(mem_ctx
, &state
,
228 struct ctdb_tunnel_destroy_state
);
235 ctdb_req_control_tunnel_deregister(&request
, tctx
->tunnel_id
);
236 subreq
= ctdb_client_control_send(state
, ev
, tctx
->client
,
237 ctdb_client_pnn(tctx
->client
),
238 tevent_timeval_zero(),
240 if (tevent_req_nomem(subreq
, req
)) {
241 return tevent_req_post(req
, ev
);
243 tevent_req_set_callback(subreq
, ctdb_tunnel_destroy_deregister_done
,
249 static void ctdb_tunnel_destroy_deregister_done(struct tevent_req
*subreq
)
251 struct tevent_req
*req
= tevent_req_callback_data(
252 subreq
, struct tevent_req
);
253 struct ctdb_tunnel_destroy_state
*state
= tevent_req_data(
254 req
, struct ctdb_tunnel_destroy_state
);
255 struct ctdb_client_context
*client
= state
->tctx
->client
;
256 struct ctdb_reply_control
*reply
;
260 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
263 tevent_req_error(req
, ret
);
267 ret
= ctdb_reply_control_tunnel_deregister(reply
);
270 tevent_req_error(req
, ret
);
274 ret
= srvid_deregister(client
->tunnels
, state
->tctx
->tunnel_id
,
277 tevent_req_error(req
, ret
);
281 tevent_req_done(req
);
284 bool ctdb_tunnel_destroy_recv(struct tevent_req
*req
, int *perr
)
288 if (tevent_req_is_unix_error(req
, &ret
)) {
298 int ctdb_tunnel_destroy(struct tevent_context
*ev
,
299 struct ctdb_tunnel_context
*tctx
)
301 struct tevent_req
*req
;
305 req
= ctdb_tunnel_destroy_send(ev
, ev
, tctx
);
310 tevent_req_poll(req
, ev
);
312 status
= ctdb_tunnel_destroy_recv(req
, &ret
);
322 * Callback when REQ_TUNNEL packet is received
325 static void ctdb_tunnel_request_reply(struct tevent_req
*req
,
326 struct ctdb_tunnel_data
*tunnel_data
);
328 void ctdb_client_req_tunnel(struct ctdb_client_context
*client
,
329 uint8_t *buf
, size_t buflen
, uint32_t reqid
)
331 TALLOC_CTX
*tmp_ctx
= talloc_new(client
);
332 struct ctdb_req_header h
;
333 struct ctdb_req_tunnel
*tunnel
;
334 struct tevent_req
*req
;
335 struct ctdb_tunnel_data tunnel_data
;
338 tunnel
= talloc_zero(tmp_ctx
, struct ctdb_req_tunnel
);
339 if (tunnel
== NULL
) {
343 ret
= ctdb_req_tunnel_pull(buf
, buflen
, &h
, tmp_ctx
, tunnel
);
348 tunnel_data
= (struct ctdb_tunnel_data
) {
354 if (tunnel
->flags
& CTDB_TUNNEL_FLAG_REPLY
) {
355 req
= reqid_find(client
->idr
, reqid
, struct tevent_req
);
360 ctdb_tunnel_request_reply(req
, &tunnel_data
);
362 } else if (tunnel
->flags
& CTDB_TUNNEL_FLAG_REQUEST
) {
365 .dsize
= sizeof(struct ctdb_tunnel_data
),
366 .dptr
= (uint8_t *)&tunnel_data
,
369 srvid_dispatch(client
->tunnels
, tunnel
->tunnel_id
, 0, data
);
373 TALLOC_FREE(tmp_ctx
);
378 * Send messages using tunnel
381 struct ctdb_tunnel_request_state
{
382 struct ctdb_tunnel_context
*tctx
;
385 struct ctdb_req_tunnel
*tunnel
;
388 static int ctdb_tunnel_request_state_destructor(
389 struct ctdb_tunnel_request_state
*state
);
390 static void ctdb_tunnel_request_done(struct tevent_req
*subreq
);
392 struct tevent_req
*ctdb_tunnel_request_send(TALLOC_CTX
*mem_ctx
,
393 struct tevent_context
*ev
,
394 struct ctdb_tunnel_context
*tctx
,
396 struct timeval timeout
,
397 uint8_t *buf
, size_t buflen
,
400 struct tevent_req
*req
, *subreq
;
401 struct ctdb_tunnel_request_state
*state
;
402 struct ctdb_req_tunnel tunnel
;
403 struct ctdb_req_header h
;
405 size_t datalen
, pkt_len
;
408 req
= tevent_req_create(mem_ctx
, &state
,
409 struct ctdb_tunnel_request_state
);
415 state
->wait_for_reply
= wait_for_reply
;
416 state
->reqid
= reqid_new(tctx
->client
->idr
, req
);
417 if (state
->reqid
== REQID_INVALID
) {
422 talloc_set_destructor(state
, ctdb_tunnel_request_state_destructor
);
424 tunnel
= (struct ctdb_req_tunnel
) {
425 .tunnel_id
= state
->tctx
->tunnel_id
,
426 .flags
= CTDB_TUNNEL_FLAG_REQUEST
,
433 if (destnode
== CTDB_BROADCAST_ALL
||
434 destnode
== CTDB_BROADCAST_ACTIVE
||
435 destnode
== CTDB_BROADCAST_CONNECTED
) {
436 state
->wait_for_reply
= false;
438 if (! state
->wait_for_reply
) {
439 tunnel
.flags
|= CTDB_TUNNEL_FLAG_NOREPLY
;
442 ctdb_req_header_fill(&h
, 0, CTDB_REQ_TUNNEL
, destnode
,
443 ctdb_client_pnn(state
->tctx
->client
),
446 datalen
= ctdb_req_tunnel_len(&h
, &tunnel
);
447 ret
= ctdb_allocate_pkt(state
, datalen
, &pkt
, &pkt_len
);
449 tevent_req_error(req
, ret
);
450 return tevent_req_post(req
, ev
);
453 ret
= ctdb_req_tunnel_push(&h
, &tunnel
, pkt
, &pkt_len
);
455 tevent_req_error(req
, ret
);
456 return tevent_req_post(req
, ev
);
459 if (!tevent_timeval_is_zero(&timeout
)) {
460 if (!tevent_req_set_endtime(req
, ev
, timeout
)) {
461 return tevent_req_post(req
, ev
);
465 subreq
= comm_write_send(state
, ev
, tctx
->client
->comm
,
467 if (tevent_req_nomem(subreq
, req
)) {
468 return tevent_req_post(req
, ev
);
470 tevent_req_set_callback(subreq
, ctdb_tunnel_request_done
, req
);
475 static int ctdb_tunnel_request_state_destructor(
476 struct ctdb_tunnel_request_state
*state
)
478 reqid_remove(state
->tctx
->client
->idr
, state
->reqid
);
482 static void ctdb_tunnel_request_done(struct tevent_req
*subreq
)
484 struct tevent_req
*req
= tevent_req_callback_data(
485 subreq
, struct tevent_req
);
486 struct ctdb_tunnel_request_state
*state
= tevent_req_data(
487 req
, struct ctdb_tunnel_request_state
);
491 status
= comm_write_recv(subreq
, &ret
);
494 tevent_req_error(req
, ret
);
498 if (! state
->wait_for_reply
) {
499 tevent_req_done(req
);
502 /* Wait for the reply or timeout */
505 static void ctdb_tunnel_request_reply(struct tevent_req
*req
,
506 struct ctdb_tunnel_data
*tunnel_data
)
508 struct ctdb_tunnel_request_state
*state
= tevent_req_data(
509 req
, struct ctdb_tunnel_request_state
);
511 if (tunnel_data
->reqid
!= state
->reqid
) {
515 state
->tunnel
= talloc_steal(state
, tunnel_data
->tunnel
);
516 tevent_req_done(req
);
519 bool ctdb_tunnel_request_recv(struct tevent_req
*req
, int *perr
,
520 TALLOC_CTX
*mem_ctx
, uint8_t **buf
,
523 struct ctdb_tunnel_request_state
*state
= tevent_req_data(
524 req
, struct ctdb_tunnel_request_state
);
527 if (tevent_req_is_unix_error(req
, &ret
)) {
534 if (state
->wait_for_reply
) {
536 *buf
= talloc_steal(mem_ctx
, state
->tunnel
->data
.dptr
);
538 if (buflen
!= NULL
) {
539 *buflen
= state
->tunnel
->data
.dsize
;
546 int ctdb_tunnel_request(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
547 struct ctdb_tunnel_context
*tctx
, int destnode
,
548 struct timeval timeout
, uint8_t *buf
, size_t buflen
,
551 struct tevent_req
*req
;
555 req
= ctdb_tunnel_request_send(mem_ctx
, ev
, tctx
, destnode
,
556 timeout
, buf
, buflen
, wait_for_reply
);
561 tevent_req_poll(req
, ev
);
563 status
= ctdb_tunnel_request_recv(req
, &ret
, NULL
, NULL
, NULL
);
572 struct ctdb_tunnel_reply_state
{
575 static void ctdb_tunnel_reply_done(struct tevent_req
*subreq
);
577 struct tevent_req
*ctdb_tunnel_reply_send(TALLOC_CTX
*mem_ctx
,
578 struct tevent_context
*ev
,
579 struct ctdb_tunnel_context
*tctx
,
580 int destnode
, uint32_t reqid
,
581 struct timeval timeout
,
582 uint8_t *buf
, size_t buflen
)
584 struct tevent_req
*req
, *subreq
;
585 struct ctdb_tunnel_reply_state
*state
;
586 struct ctdb_req_tunnel tunnel
;
587 struct ctdb_req_header h
;
589 size_t datalen
, pkt_len
;
592 req
= tevent_req_create(mem_ctx
, &state
,
593 struct ctdb_tunnel_reply_state
);
598 tunnel
= (struct ctdb_req_tunnel
) {
599 .tunnel_id
= tctx
->tunnel_id
,
600 .flags
= CTDB_TUNNEL_FLAG_REPLY
,
607 ctdb_req_header_fill(&h
, 0, CTDB_REQ_TUNNEL
, destnode
,
608 ctdb_client_pnn(tctx
->client
), reqid
);
610 datalen
= ctdb_req_tunnel_len(&h
, &tunnel
);
611 ret
= ctdb_allocate_pkt(state
, datalen
, &pkt
, &pkt_len
);
613 tevent_req_error(req
, ret
);
614 return tevent_req_post(req
, ev
);
617 ret
= ctdb_req_tunnel_push(&h
, &tunnel
, pkt
, &pkt_len
);
619 tevent_req_error(req
, ret
);
620 return tevent_req_post(req
, ev
);
623 if (!tevent_timeval_is_zero(&timeout
)) {
624 if (!tevent_req_set_endtime(req
, ev
, timeout
)) {
625 return tevent_req_post(req
, ev
);
629 subreq
= comm_write_send(state
, ev
, tctx
->client
->comm
, pkt
, pkt_len
);
630 if (tevent_req_nomem(subreq
, req
)) {
631 return tevent_req_post(req
, ev
);
633 tevent_req_set_callback(subreq
, ctdb_tunnel_reply_done
, req
);
638 static void ctdb_tunnel_reply_done(struct tevent_req
*subreq
)
640 struct tevent_req
*req
= tevent_req_callback_data(
641 subreq
, struct tevent_req
);
645 status
= comm_write_recv(subreq
, &ret
);
648 tevent_req_error(req
, ret
);
652 tevent_req_done(req
);
655 bool ctdb_tunnel_reply_recv(struct tevent_req
*req
, int *perr
)
659 if (tevent_req_is_unix_error(req
, &ret
)) {
669 int ctdb_tunnel_reply(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
670 struct ctdb_tunnel_context
*tctx
, int destnode
,
671 uint32_t reqid
, struct timeval timeout
,
672 uint8_t *buf
, size_t buflen
)
674 struct tevent_req
*req
;
678 req
= ctdb_tunnel_reply_send(mem_ctx
, ev
, tctx
, destnode
, reqid
,
679 timeout
, buf
, buflen
);
684 tevent_req_poll(req
, ev
);
686 status
= ctdb_tunnel_reply_recv(req
, &ret
);