4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "lib/util/tevent_unix.h"
30 #include "common/reqid.h"
31 #include "common/srvid.h"
32 #include "common/comm.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
45 struct ctdb_client_message_state
{
46 struct ctdb_client_context
*client
;
50 static int ctdb_client_message_state_destructor(
51 struct ctdb_client_message_state
*state
);
52 static void ctdb_client_message_done(struct tevent_req
*subreq
);
54 struct tevent_req
*ctdb_client_message_send(TALLOC_CTX
*mem_ctx
,
55 struct tevent_context
*ev
,
56 struct ctdb_client_context
*client
,
58 struct ctdb_req_message
*message
)
60 struct tevent_req
*req
, *subreq
;
61 struct ctdb_client_message_state
*state
;
62 struct ctdb_req_header h
;
65 size_t datalen
, buflen
;
68 req
= tevent_req_create(mem_ctx
, &state
,
69 struct ctdb_client_message_state
);
74 reqid
= reqid_new(client
->idr
, state
);
75 if (reqid
== REQID_INVALID
) {
80 state
->client
= client
;
83 talloc_set_destructor(state
, ctdb_client_message_state_destructor
);
85 ctdb_req_header_fill(&h
, 0, CTDB_REQ_MESSAGE
, destnode
,
88 datalen
= ctdb_req_message_len(&h
, message
);
89 ret
= ctdb_allocate_pkt(state
, datalen
, &buf
, &buflen
);
91 tevent_req_error(req
, ret
);
92 return tevent_req_post(req
, ev
);
95 ret
= ctdb_req_message_push(&h
, message
, buf
, &buflen
);
97 tevent_req_error(req
, ret
);
98 return tevent_req_post(req
, ev
);
101 subreq
= comm_write_send(state
, ev
, client
->comm
, buf
, buflen
);
102 if (tevent_req_nomem(subreq
, req
)) {
103 return tevent_req_post(req
, ev
);
105 tevent_req_set_callback(subreq
, ctdb_client_message_done
, req
);
110 static int ctdb_client_message_state_destructor(
111 struct ctdb_client_message_state
*state
)
113 reqid_remove(state
->client
->idr
, state
->reqid
);
117 static void ctdb_client_message_done(struct tevent_req
*subreq
)
119 struct tevent_req
*req
= tevent_req_callback_data(
120 subreq
, struct tevent_req
);
124 status
= comm_write_recv(subreq
, &ret
);
127 tevent_req_error(req
, ret
);
131 tevent_req_done(req
);
134 bool ctdb_client_message_recv(struct tevent_req
*req
, int *perr
)
138 if (tevent_req_is_unix_error(req
, &err
)) {
148 void ctdb_client_req_message(struct ctdb_client_context
*client
,
149 uint8_t *buf
, size_t buflen
, uint32_t reqid
)
151 struct ctdb_req_header h
;
152 struct ctdb_req_message_data message
;
153 TALLOC_CTX
*tmp_ctx
= talloc_new(client
);
156 ret
= ctdb_req_message_data_pull(buf
, buflen
, &h
, tmp_ctx
, &message
);
161 srvid_dispatch(client
->srv
, message
.srvid
, CTDB_SRVID_ALL
,
163 talloc_free(tmp_ctx
);
167 * Handle multiple nodes
170 struct ctdb_client_message_multi_state
{
178 struct message_index_state
{
179 struct tevent_req
*req
;
183 static void ctdb_client_message_multi_done(struct tevent_req
*subreq
);
185 struct tevent_req
*ctdb_client_message_multi_send(
187 struct tevent_context
*ev
,
188 struct ctdb_client_context
*client
,
189 uint32_t *pnn_list
, int count
,
190 struct ctdb_req_message
*message
)
192 struct tevent_req
*req
, *subreq
;
193 struct ctdb_client_message_multi_state
*state
;
196 if (pnn_list
== NULL
|| count
== 0) {
200 req
= tevent_req_create(mem_ctx
, &state
,
201 struct ctdb_client_message_multi_state
);
206 state
->pnn_list
= pnn_list
;
207 state
->count
= count
;
210 state
->err_list
= talloc_zero_array(state
, int, count
);
211 if (tevent_req_nomem(state
->err_list
, req
)) {
212 return tevent_req_post(req
, ev
);
215 for (i
=0; i
<count
; i
++) {
216 struct message_index_state
*substate
;
218 subreq
= ctdb_client_message_send(state
, ev
, client
,
219 pnn_list
[i
], message
);
220 if (tevent_req_nomem(subreq
, req
)) {
221 return tevent_req_post(req
, ev
);
224 substate
= talloc(subreq
, struct message_index_state
);
225 if (tevent_req_nomem(substate
, req
)) {
226 return tevent_req_post(req
, ev
);
232 tevent_req_set_callback(subreq
, ctdb_client_message_multi_done
,
239 static void ctdb_client_message_multi_done(struct tevent_req
*subreq
)
241 struct message_index_state
*substate
= tevent_req_callback_data(
242 subreq
, struct message_index_state
);
243 struct tevent_req
*req
= substate
->req
;
244 int idx
= substate
->index
;
245 struct ctdb_client_message_multi_state
*state
= tevent_req_data(
246 req
, struct ctdb_client_message_multi_state
);
250 status
= ctdb_client_message_recv(subreq
, &ret
);
253 if (state
->err
== 0) {
255 state
->err_list
[idx
] = state
->err
;
261 if (state
->done
== state
->count
) {
262 tevent_req_done(req
);
266 bool ctdb_client_message_multi_recv(struct tevent_req
*req
, int *perr
,
267 TALLOC_CTX
*mem_ctx
, int **perr_list
)
269 struct ctdb_client_message_multi_state
*state
= tevent_req_data(
270 req
, struct ctdb_client_message_multi_state
);
273 if (tevent_req_is_unix_error(req
, &err
)) {
277 if (perr_list
!= NULL
) {
278 *perr_list
= talloc_steal(mem_ctx
, state
->err_list
);
287 if (perr_list
!= NULL
) {
288 *perr_list
= talloc_steal(mem_ctx
, state
->err_list
);
291 if (state
->err
!= 0) {
299 * sync version of message send
302 int ctdb_client_message(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
303 struct ctdb_client_context
*client
,
304 uint32_t destnode
, struct ctdb_req_message
*message
)
307 struct tevent_req
*req
;
311 tmp_ctx
= talloc_new(client
);
312 if (tmp_ctx
== NULL
) {
316 req
= ctdb_client_message_send(tmp_ctx
, ev
, client
, destnode
, message
);
318 talloc_free(tmp_ctx
);
322 tevent_req_poll(req
, ev
);
324 status
= ctdb_client_message_recv(req
, &ret
);
326 talloc_free(tmp_ctx
);
330 talloc_free(tmp_ctx
);
334 struct ctdb_client_set_message_handler_state
{
335 struct ctdb_client_context
*client
;
337 srvid_handler_fn handler
;
341 static void ctdb_client_set_message_handler_done(struct tevent_req
*subreq
);
343 struct tevent_req
*ctdb_client_set_message_handler_send(
345 struct tevent_context
*ev
,
346 struct ctdb_client_context
*client
,
348 srvid_handler_fn handler
,
351 struct tevent_req
*req
, *subreq
;
352 struct ctdb_client_set_message_handler_state
*state
;
353 struct ctdb_req_control request
;
355 req
= tevent_req_create(mem_ctx
, &state
,
356 struct ctdb_client_set_message_handler_state
);
361 state
->client
= client
;
362 state
->srvid
= srvid
;
363 state
->handler
= handler
;
364 state
->private_data
= private_data
;
366 ctdb_req_control_register_srvid(&request
, srvid
);
367 subreq
= ctdb_client_control_send(state
, ev
, client
, client
->pnn
,
368 tevent_timeval_zero(), &request
);
369 if (tevent_req_nomem(subreq
, req
)) {
370 return tevent_req_post(req
, ev
);
372 tevent_req_set_callback(subreq
, ctdb_client_set_message_handler_done
,
378 static void ctdb_client_set_message_handler_done(struct tevent_req
*subreq
)
380 struct tevent_req
*req
= tevent_req_callback_data(
381 subreq
, struct tevent_req
);
382 struct ctdb_client_set_message_handler_state
*state
= tevent_req_data(
383 req
, struct ctdb_client_set_message_handler_state
);
384 struct ctdb_reply_control
*reply
;
388 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
391 tevent_req_error(req
, ret
);
395 ret
= ctdb_reply_control_register_srvid(reply
);
398 tevent_req_error(req
, ret
);
402 ret
= srvid_register(state
->client
->srv
, state
->client
, state
->srvid
,
403 state
->handler
, state
->private_data
);
405 tevent_req_error(req
, ret
);
409 tevent_req_done(req
);
412 bool ctdb_client_set_message_handler_recv(struct tevent_req
*req
, int *perr
)
416 if (tevent_req_is_unix_error(req
, &err
)) {
425 struct ctdb_client_remove_message_handler_state
{
426 struct ctdb_client_context
*client
;
431 static void ctdb_client_remove_message_handler_done(struct tevent_req
*subreq
);
433 struct tevent_req
*ctdb_client_remove_message_handler_send(
435 struct tevent_context
*ev
,
436 struct ctdb_client_context
*client
,
440 struct tevent_req
*req
, *subreq
;
441 struct ctdb_client_remove_message_handler_state
*state
;
442 struct ctdb_req_control request
;
444 req
= tevent_req_create(mem_ctx
, &state
,
445 struct ctdb_client_remove_message_handler_state
);
450 state
->client
= client
;
451 state
->srvid
= srvid
;
452 state
->private_data
= private_data
;
454 ctdb_req_control_deregister_srvid(&request
, srvid
);
455 subreq
= ctdb_client_control_send(state
, ev
, client
, client
->pnn
,
456 tevent_timeval_zero(), &request
);
457 if (tevent_req_nomem(subreq
, req
)) {
458 return tevent_req_post(req
, ev
);
460 tevent_req_set_callback(subreq
,
461 ctdb_client_remove_message_handler_done
, req
);
466 static void ctdb_client_remove_message_handler_done(struct tevent_req
*subreq
)
468 struct tevent_req
*req
= tevent_req_callback_data(
469 subreq
, struct tevent_req
);
470 struct ctdb_client_remove_message_handler_state
*state
= tevent_req_data(
471 req
, struct ctdb_client_remove_message_handler_state
);
472 struct ctdb_reply_control
*reply
;
476 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
479 tevent_req_error(req
, ret
);
483 ret
= ctdb_reply_control_deregister_srvid(reply
);
486 tevent_req_error(req
, ret
);
490 ret
= srvid_deregister(state
->client
->srv
, state
->srvid
,
491 state
->private_data
);
493 tevent_req_error(req
, ret
);
497 tevent_req_done(req
);
500 bool ctdb_client_remove_message_handler_recv(struct tevent_req
*req
, int *perr
)
504 if (tevent_req_is_unix_error(req
, &err
)) {
513 int ctdb_client_set_message_handler(struct tevent_context
*ev
,
514 struct ctdb_client_context
*client
,
515 uint64_t srvid
, srvid_handler_fn handler
,
521 mem_ctx
= talloc_new(client
);
522 if (mem_ctx
== NULL
) {
526 ret
= ctdb_ctrl_register_srvid(mem_ctx
, ev
, client
, client
->pnn
,
527 tevent_timeval_zero(), srvid
);
528 talloc_free(mem_ctx
);
533 return srvid_register(client
->srv
, client
, srvid
,
534 handler
, private_data
);
537 int ctdb_client_remove_message_handler(struct tevent_context
*ev
,
538 struct ctdb_client_context
*client
,
539 uint64_t srvid
, void *private_data
)
544 mem_ctx
= talloc_new(client
);
545 if (mem_ctx
== NULL
) {
549 ret
= ctdb_ctrl_deregister_srvid(mem_ctx
, ev
, client
, client
->pnn
,
550 tevent_timeval_zero(), srvid
);
551 talloc_free(mem_ctx
);
556 return srvid_deregister(client
->srv
, srvid
, private_data
);