4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "lib/util/tevent_unix.h"
30 #include "common/reqid.h"
31 #include "common/srvid.h"
32 #include "common/comm.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
45 struct ctdb_client_message_state
{
46 struct ctdb_client_context
*client
;
50 static int ctdb_client_message_state_destructor(
51 struct ctdb_client_message_state
*state
);
52 static void ctdb_client_message_done(struct tevent_req
*subreq
);
54 struct tevent_req
*ctdb_client_message_send(TALLOC_CTX
*mem_ctx
,
55 struct tevent_context
*ev
,
56 struct ctdb_client_context
*client
,
58 struct ctdb_req_message
*message
)
60 struct tevent_req
*req
, *subreq
;
61 struct ctdb_client_message_state
*state
;
62 struct ctdb_req_header h
;
65 size_t datalen
, buflen
;
68 req
= tevent_req_create(mem_ctx
, &state
,
69 struct ctdb_client_message_state
);
74 reqid
= reqid_new(client
->idr
, state
);
75 if (reqid
== REQID_INVALID
) {
80 state
->client
= client
;
83 talloc_set_destructor(state
, ctdb_client_message_state_destructor
);
85 ctdb_req_header_fill(&h
, 0, CTDB_REQ_MESSAGE
, destnode
,
88 datalen
= ctdb_req_message_len(&h
, message
);
89 ret
= ctdb_allocate_pkt(state
, datalen
, &buf
, &buflen
);
91 tevent_req_error(req
, ret
);
92 return tevent_req_post(req
, ev
);
95 ret
= ctdb_req_message_push(&h
, message
, buf
, &buflen
);
97 tevent_req_error(req
, ret
);
98 return tevent_req_post(req
, ev
);
101 subreq
= comm_write_send(state
, ev
, client
->comm
, buf
, buflen
);
102 if (tevent_req_nomem(subreq
, req
)) {
103 return tevent_req_post(req
, ev
);
105 tevent_req_set_callback(subreq
, ctdb_client_message_done
, req
);
110 static int ctdb_client_message_state_destructor(
111 struct ctdb_client_message_state
*state
)
113 reqid_remove(state
->client
->idr
, state
->reqid
);
117 static void ctdb_client_message_done(struct tevent_req
*subreq
)
119 struct tevent_req
*req
= tevent_req_callback_data(
120 subreq
, struct tevent_req
);
124 status
= comm_write_recv(subreq
, &ret
);
127 tevent_req_error(req
, ret
);
131 tevent_req_done(req
);
134 bool ctdb_client_message_recv(struct tevent_req
*req
, int *perr
)
138 if (tevent_req_is_unix_error(req
, &err
)) {
148 void ctdb_client_req_message(struct ctdb_client_context
*client
,
149 uint8_t *buf
, size_t buflen
, uint32_t reqid
)
151 struct ctdb_req_header h
;
152 struct ctdb_req_message_data message
;
153 TALLOC_CTX
*tmp_ctx
= talloc_new(client
);
156 ret
= ctdb_req_message_data_pull(buf
, buflen
, &h
, tmp_ctx
, &message
);
161 srvid_dispatch(client
->srv
, message
.srvid
, CTDB_SRVID_ALL
,
163 talloc_free(tmp_ctx
);
167 * Handle multiple nodes
170 struct ctdb_client_message_multi_state
{
178 struct message_index_state
{
179 struct tevent_req
*req
;
183 static void ctdb_client_message_multi_done(struct tevent_req
*subreq
);
185 struct tevent_req
*ctdb_client_message_multi_send(
187 struct tevent_context
*ev
,
188 struct ctdb_client_context
*client
,
189 uint32_t *pnn_list
, int count
,
190 struct ctdb_req_message
*message
)
192 struct tevent_req
*req
, *subreq
;
193 struct ctdb_client_message_multi_state
*state
;
196 if (pnn_list
== NULL
|| count
== 0) {
200 req
= tevent_req_create(mem_ctx
, &state
,
201 struct ctdb_client_message_multi_state
);
206 state
->pnn_list
= pnn_list
;
207 state
->count
= count
;
210 state
->err_list
= talloc_zero_array(state
, int, count
);
211 if (tevent_req_nomem(state
->err_list
, req
)) {
212 return tevent_req_post(req
, ev
);
215 for (i
=0; i
<count
; i
++) {
216 struct message_index_state
*substate
;
218 subreq
= ctdb_client_message_send(state
, ev
, client
,
219 pnn_list
[i
], message
);
220 if (tevent_req_nomem(subreq
, req
)) {
221 return tevent_req_post(req
, ev
);
224 substate
= talloc(subreq
, struct message_index_state
);
225 if (tevent_req_nomem(substate
, req
)) {
226 return tevent_req_post(req
, ev
);
232 tevent_req_set_callback(subreq
, ctdb_client_message_multi_done
,
239 static void ctdb_client_message_multi_done(struct tevent_req
*subreq
)
241 struct message_index_state
*substate
= tevent_req_callback_data(
242 subreq
, struct message_index_state
);
243 struct tevent_req
*req
= substate
->req
;
244 int idx
= substate
->index
;
245 struct ctdb_client_message_multi_state
*state
= tevent_req_data(
246 req
, struct ctdb_client_message_multi_state
);
250 status
= ctdb_client_message_recv(subreq
, &ret
);
253 if (state
->err
== 0) {
255 state
->err_list
[idx
] = state
->err
;
261 if (state
->done
== state
->count
) {
262 tevent_req_done(req
);
266 bool ctdb_client_message_multi_recv(struct tevent_req
*req
, int *perr
,
267 TALLOC_CTX
*mem_ctx
, int **perr_list
)
269 struct ctdb_client_message_multi_state
*state
= tevent_req_data(
270 req
, struct ctdb_client_message_multi_state
);
273 if (tevent_req_is_unix_error(req
, &err
)) {
277 if (perr_list
!= NULL
) {
278 *perr_list
= talloc_steal(mem_ctx
, state
->err_list
);
287 if (perr_list
!= NULL
) {
288 *perr_list
= talloc_steal(mem_ctx
, state
->err_list
);
291 if (state
->err
!= 0) {
299 * sync version of message send
302 int ctdb_client_message(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
303 struct ctdb_client_context
*client
,
304 uint32_t destnode
, struct ctdb_req_message
*message
)
307 struct tevent_req
*req
;
311 tmp_ctx
= talloc_new(client
);
312 if (tmp_ctx
== NULL
) {
316 req
= ctdb_client_message_send(tmp_ctx
, ev
, client
, destnode
, message
);
318 talloc_free(tmp_ctx
);
322 tevent_req_poll(req
, ev
);
324 status
= ctdb_client_message_recv(req
, &ret
);
326 talloc_free(tmp_ctx
);
330 talloc_free(tmp_ctx
);
334 int ctdb_client_message_multi(TALLOC_CTX
*mem_ctx
,
335 struct tevent_context
*ev
,
336 struct ctdb_client_context
*client
,
337 uint32_t *pnn_list
, int count
,
338 struct ctdb_req_message
*message
,
341 struct tevent_req
*req
;
345 req
= ctdb_client_message_multi_send(mem_ctx
, ev
, client
,
352 tevent_req_poll(req
, ev
);
354 status
= ctdb_client_message_multi_recv(req
, &ret
, mem_ctx
, perr_list
);
362 struct ctdb_client_set_message_handler_state
{
363 struct ctdb_client_context
*client
;
365 srvid_handler_fn handler
;
369 static void ctdb_client_set_message_handler_done(struct tevent_req
*subreq
);
371 struct tevent_req
*ctdb_client_set_message_handler_send(
373 struct tevent_context
*ev
,
374 struct ctdb_client_context
*client
,
376 srvid_handler_fn handler
,
379 struct tevent_req
*req
, *subreq
;
380 struct ctdb_client_set_message_handler_state
*state
;
381 struct ctdb_req_control request
;
383 req
= tevent_req_create(mem_ctx
, &state
,
384 struct ctdb_client_set_message_handler_state
);
389 state
->client
= client
;
390 state
->srvid
= srvid
;
391 state
->handler
= handler
;
392 state
->private_data
= private_data
;
394 ctdb_req_control_register_srvid(&request
, srvid
);
395 subreq
= ctdb_client_control_send(state
, ev
, client
, client
->pnn
,
396 tevent_timeval_zero(), &request
);
397 if (tevent_req_nomem(subreq
, req
)) {
398 return tevent_req_post(req
, ev
);
400 tevent_req_set_callback(subreq
, ctdb_client_set_message_handler_done
,
406 static void ctdb_client_set_message_handler_done(struct tevent_req
*subreq
)
408 struct tevent_req
*req
= tevent_req_callback_data(
409 subreq
, struct tevent_req
);
410 struct ctdb_client_set_message_handler_state
*state
= tevent_req_data(
411 req
, struct ctdb_client_set_message_handler_state
);
412 struct ctdb_reply_control
*reply
;
416 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
419 tevent_req_error(req
, ret
);
423 ret
= ctdb_reply_control_register_srvid(reply
);
426 tevent_req_error(req
, ret
);
430 ret
= srvid_register(state
->client
->srv
, state
->client
, state
->srvid
,
431 state
->handler
, state
->private_data
);
433 tevent_req_error(req
, ret
);
437 tevent_req_done(req
);
440 bool ctdb_client_set_message_handler_recv(struct tevent_req
*req
, int *perr
)
444 if (tevent_req_is_unix_error(req
, &err
)) {
453 struct ctdb_client_remove_message_handler_state
{
454 struct ctdb_client_context
*client
;
459 static void ctdb_client_remove_message_handler_done(struct tevent_req
*subreq
);
461 struct tevent_req
*ctdb_client_remove_message_handler_send(
463 struct tevent_context
*ev
,
464 struct ctdb_client_context
*client
,
468 struct tevent_req
*req
, *subreq
;
469 struct ctdb_client_remove_message_handler_state
*state
;
470 struct ctdb_req_control request
;
472 req
= tevent_req_create(mem_ctx
, &state
,
473 struct ctdb_client_remove_message_handler_state
);
478 state
->client
= client
;
479 state
->srvid
= srvid
;
480 state
->private_data
= private_data
;
482 ctdb_req_control_deregister_srvid(&request
, srvid
);
483 subreq
= ctdb_client_control_send(state
, ev
, client
, client
->pnn
,
484 tevent_timeval_zero(), &request
);
485 if (tevent_req_nomem(subreq
, req
)) {
486 return tevent_req_post(req
, ev
);
488 tevent_req_set_callback(subreq
,
489 ctdb_client_remove_message_handler_done
, req
);
494 static void ctdb_client_remove_message_handler_done(struct tevent_req
*subreq
)
496 struct tevent_req
*req
= tevent_req_callback_data(
497 subreq
, struct tevent_req
);
498 struct ctdb_client_remove_message_handler_state
*state
= tevent_req_data(
499 req
, struct ctdb_client_remove_message_handler_state
);
500 struct ctdb_reply_control
*reply
;
504 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
507 tevent_req_error(req
, ret
);
511 ret
= ctdb_reply_control_deregister_srvid(reply
);
514 tevent_req_error(req
, ret
);
518 ret
= srvid_deregister(state
->client
->srv
, state
->srvid
,
519 state
->private_data
);
521 tevent_req_error(req
, ret
);
525 tevent_req_done(req
);
528 bool ctdb_client_remove_message_handler_recv(struct tevent_req
*req
, int *perr
)
532 if (tevent_req_is_unix_error(req
, &err
)) {
541 int ctdb_client_set_message_handler(struct tevent_context
*ev
,
542 struct ctdb_client_context
*client
,
543 uint64_t srvid
, srvid_handler_fn handler
,
547 struct tevent_req
*req
;
551 mem_ctx
= talloc_new(client
);
552 if (mem_ctx
== NULL
) {
556 req
= ctdb_client_set_message_handler_send(mem_ctx
, ev
, client
,
560 talloc_free(mem_ctx
);
564 tevent_req_poll(req
, ev
);
566 status
= ctdb_client_set_message_handler_recv(req
, &ret
);
568 talloc_free(mem_ctx
);
572 talloc_free(mem_ctx
);
576 int ctdb_client_remove_message_handler(struct tevent_context
*ev
,
577 struct ctdb_client_context
*client
,
578 uint64_t srvid
, void *private_data
)
581 struct tevent_req
*req
;
585 mem_ctx
= talloc_new(client
);
586 if (mem_ctx
== NULL
) {
590 req
= ctdb_client_remove_message_handler_send(mem_ctx
, ev
, client
,
591 srvid
, private_data
);
593 talloc_free(mem_ctx
);
597 tevent_req_poll(req
, ev
);
599 status
= ctdb_client_remove_message_handler_recv(req
, &ret
);
601 talloc_free(mem_ctx
);
605 talloc_free(mem_ctx
);