4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "lib/util/tevent_unix.h"
30 #include "common/reqid.h"
31 #include "common/srvid.h"
32 #include "common/comm.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
45 struct ctdb_client_message_state
{
46 struct ctdb_client_context
*client
;
50 static int ctdb_client_message_state_destructor(
51 struct ctdb_client_message_state
*state
);
52 static void ctdb_client_message_done(struct tevent_req
*subreq
);
54 struct tevent_req
*ctdb_client_message_send(TALLOC_CTX
*mem_ctx
,
55 struct tevent_context
*ev
,
56 struct ctdb_client_context
*client
,
58 struct ctdb_req_message
*message
)
60 struct tevent_req
*req
, *subreq
;
61 struct ctdb_client_message_state
*state
;
62 struct ctdb_req_header h
;
68 req
= tevent_req_create(mem_ctx
, &state
,
69 struct ctdb_client_message_state
);
74 reqid
= reqid_new(client
->idr
, state
);
75 if (reqid
== REQID_INVALID
) {
80 state
->client
= client
;
83 talloc_set_destructor(state
, ctdb_client_message_state_destructor
);
85 ctdb_req_header_fill(&h
, 0, CTDB_REQ_MESSAGE
, destnode
,
88 ret
= ctdb_req_message_push(&h
, message
, state
, &buf
, &buflen
);
90 tevent_req_error(req
, ret
);
91 return tevent_req_post(req
, ev
);
94 subreq
= comm_write_send(state
, ev
, client
->comm
, buf
, buflen
);
95 if (tevent_req_nomem(subreq
, req
)) {
96 return tevent_req_post(req
, ev
);
98 tevent_req_set_callback(subreq
, ctdb_client_message_done
, req
);
103 static int ctdb_client_message_state_destructor(
104 struct ctdb_client_message_state
*state
)
106 reqid_remove(state
->client
->idr
, state
->reqid
);
110 static void ctdb_client_message_done(struct tevent_req
*subreq
)
112 struct tevent_req
*req
= tevent_req_callback_data(
113 subreq
, struct tevent_req
);
117 status
= comm_write_recv(subreq
, &ret
);
120 tevent_req_error(req
, ret
);
124 tevent_req_done(req
);
127 bool ctdb_client_message_recv(struct tevent_req
*req
, int *perr
)
131 if (tevent_req_is_unix_error(req
, &err
)) {
141 void ctdb_client_req_message(struct ctdb_client_context
*client
,
142 uint8_t *buf
, size_t buflen
, uint32_t reqid
)
144 struct ctdb_req_header h
;
145 struct ctdb_req_message_data message
;
146 TALLOC_CTX
*tmp_ctx
= talloc_new(client
);
149 ret
= ctdb_req_message_data_pull(buf
, buflen
, &h
, tmp_ctx
, &message
);
154 srvid_dispatch(client
->srv
, message
.srvid
, CTDB_SRVID_ALL
,
156 talloc_free(tmp_ctx
);
160 * Handle multiple nodes
163 struct ctdb_client_message_multi_state
{
171 struct message_index_state
{
172 struct tevent_req
*req
;
176 static void ctdb_client_message_multi_done(struct tevent_req
*subreq
);
178 struct tevent_req
*ctdb_client_message_multi_send(
180 struct tevent_context
*ev
,
181 struct ctdb_client_context
*client
,
182 uint32_t *pnn_list
, int count
,
183 struct ctdb_req_message
*message
)
185 struct tevent_req
*req
, *subreq
;
186 struct ctdb_client_message_multi_state
*state
;
189 if (pnn_list
== NULL
|| count
== 0) {
193 req
= tevent_req_create(mem_ctx
, &state
,
194 struct ctdb_client_message_multi_state
);
199 state
->pnn_list
= pnn_list
;
200 state
->count
= count
;
203 state
->err_list
= talloc_zero_array(state
, int, count
);
204 if (tevent_req_nomem(state
->err_list
, req
)) {
205 return tevent_req_post(req
, ev
);
208 for (i
=0; i
<count
; i
++) {
209 struct message_index_state
*substate
;
211 subreq
= ctdb_client_message_send(state
, ev
, client
,
212 pnn_list
[i
], message
);
213 if (tevent_req_nomem(subreq
, req
)) {
214 return tevent_req_post(req
, ev
);
217 substate
= talloc(subreq
, struct message_index_state
);
218 if (tevent_req_nomem(substate
, req
)) {
219 return tevent_req_post(req
, ev
);
225 tevent_req_set_callback(subreq
, ctdb_client_message_multi_done
,
232 static void ctdb_client_message_multi_done(struct tevent_req
*subreq
)
234 struct message_index_state
*substate
= tevent_req_callback_data(
235 subreq
, struct message_index_state
);
236 struct tevent_req
*req
= substate
->req
;
237 int idx
= substate
->index
;
238 struct ctdb_client_message_multi_state
*state
= tevent_req_data(
239 req
, struct ctdb_client_message_multi_state
);
243 status
= ctdb_client_message_recv(subreq
, &ret
);
246 if (state
->err
== 0) {
248 state
->err_list
[idx
] = state
->err
;
254 if (state
->done
== state
->count
) {
255 tevent_req_done(req
);
259 bool ctdb_client_message_multi_recv(struct tevent_req
*req
, int *perr
,
260 TALLOC_CTX
*mem_ctx
, int **perr_list
)
262 struct ctdb_client_message_multi_state
*state
= tevent_req_data(
263 req
, struct ctdb_client_message_multi_state
);
266 if (tevent_req_is_unix_error(req
, &err
)) {
270 if (perr_list
!= NULL
) {
271 *perr_list
= talloc_steal(mem_ctx
, state
->err_list
);
280 if (perr_list
!= NULL
) {
281 *perr_list
= talloc_steal(mem_ctx
, state
->err_list
);
284 if (state
->err
!= 0) {
292 * sync version of message send
295 int ctdb_client_message(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
296 struct ctdb_client_context
*client
,
297 uint32_t destnode
, struct ctdb_req_message
*message
)
300 struct tevent_req
*req
;
304 tmp_ctx
= talloc_new(client
);
305 if (tmp_ctx
== NULL
) {
309 req
= ctdb_client_message_send(tmp_ctx
, ev
, client
, destnode
, message
);
311 talloc_free(tmp_ctx
);
315 tevent_req_poll(req
, ev
);
317 status
= ctdb_client_message_recv(req
, &ret
);
319 talloc_free(tmp_ctx
);
323 talloc_free(tmp_ctx
);
327 struct ctdb_client_set_message_handler_state
{
328 struct ctdb_client_context
*client
;
330 srvid_handler_fn handler
;
334 static void ctdb_client_set_message_handler_done(struct tevent_req
*subreq
);
336 struct tevent_req
*ctdb_client_set_message_handler_send(
338 struct tevent_context
*ev
,
339 struct ctdb_client_context
*client
,
341 srvid_handler_fn handler
,
344 struct tevent_req
*req
, *subreq
;
345 struct ctdb_client_set_message_handler_state
*state
;
346 struct ctdb_req_control request
;
348 req
= tevent_req_create(mem_ctx
, &state
,
349 struct ctdb_client_set_message_handler_state
);
354 state
->client
= client
;
355 state
->srvid
= srvid
;
356 state
->handler
= handler
;
357 state
->private_data
= private_data
;
359 ctdb_req_control_register_srvid(&request
, srvid
);
360 subreq
= ctdb_client_control_send(state
, ev
, client
, client
->pnn
,
361 tevent_timeval_zero(), &request
);
362 if (tevent_req_nomem(subreq
, req
)) {
363 return tevent_req_post(req
, ev
);
365 tevent_req_set_callback(subreq
, ctdb_client_set_message_handler_done
,
371 static void ctdb_client_set_message_handler_done(struct tevent_req
*subreq
)
373 struct tevent_req
*req
= tevent_req_callback_data(
374 subreq
, struct tevent_req
);
375 struct ctdb_client_set_message_handler_state
*state
= tevent_req_data(
376 req
, struct ctdb_client_set_message_handler_state
);
377 struct ctdb_reply_control
*reply
;
381 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
384 tevent_req_error(req
, ret
);
388 ret
= ctdb_reply_control_register_srvid(reply
);
391 tevent_req_error(req
, ret
);
395 ret
= srvid_register(state
->client
->srv
, state
->client
, state
->srvid
,
396 state
->handler
, state
->private_data
);
398 tevent_req_error(req
, ret
);
402 tevent_req_done(req
);
405 bool ctdb_client_set_message_handler_recv(struct tevent_req
*req
, int *perr
)
409 if (tevent_req_is_unix_error(req
, &err
)) {
418 struct ctdb_client_remove_message_handler_state
{
419 struct ctdb_client_context
*client
;
424 static void ctdb_client_remove_message_handler_done(struct tevent_req
*subreq
);
426 struct tevent_req
*ctdb_client_remove_message_handler_send(
428 struct tevent_context
*ev
,
429 struct ctdb_client_context
*client
,
433 struct tevent_req
*req
, *subreq
;
434 struct ctdb_client_remove_message_handler_state
*state
;
435 struct ctdb_req_control request
;
437 req
= tevent_req_create(mem_ctx
, &state
,
438 struct ctdb_client_remove_message_handler_state
);
443 state
->client
= client
;
444 state
->srvid
= srvid
;
445 state
->private_data
= private_data
;
447 ctdb_req_control_deregister_srvid(&request
, srvid
);
448 subreq
= ctdb_client_control_send(state
, ev
, client
, client
->pnn
,
449 tevent_timeval_zero(), &request
);
450 if (tevent_req_nomem(subreq
, req
)) {
451 return tevent_req_post(req
, ev
);
453 tevent_req_set_callback(subreq
,
454 ctdb_client_remove_message_handler_done
, req
);
459 static void ctdb_client_remove_message_handler_done(struct tevent_req
*subreq
)
461 struct tevent_req
*req
= tevent_req_callback_data(
462 subreq
, struct tevent_req
);
463 struct ctdb_client_remove_message_handler_state
*state
= tevent_req_data(
464 req
, struct ctdb_client_remove_message_handler_state
);
465 struct ctdb_reply_control
*reply
;
469 status
= ctdb_client_control_recv(subreq
, &ret
, state
, &reply
);
472 tevent_req_error(req
, ret
);
476 ret
= ctdb_reply_control_deregister_srvid(reply
);
479 tevent_req_error(req
, ret
);
483 ret
= srvid_deregister(state
->client
->srv
, state
->srvid
,
484 state
->private_data
);
486 tevent_req_error(req
, ret
);
490 tevent_req_done(req
);
493 bool ctdb_client_remove_message_handler_recv(struct tevent_req
*req
, int *perr
)
497 if (tevent_req_is_unix_error(req
, &err
)) {
506 int ctdb_client_set_message_handler(TALLOC_CTX
*mem_ctx
,
507 struct tevent_context
*ev
,
508 struct ctdb_client_context
*client
,
509 uint64_t srvid
, srvid_handler_fn handler
,
514 ret
= ctdb_ctrl_register_srvid(mem_ctx
, ev
, client
, client
->pnn
,
515 tevent_timeval_zero(), srvid
);
520 return srvid_register(client
->srv
, client
, srvid
,
521 handler
, private_data
);
524 int ctdb_client_remove_message_handler(TALLOC_CTX
*mem_ctx
,
525 struct tevent_context
*ev
,
526 struct ctdb_client_context
*client
,
527 uint64_t srvid
, void *private_data
)
531 ret
= ctdb_ctrl_deregister_srvid(mem_ctx
, ev
, client
, client
->pnn
,
532 tevent_timeval_zero(), srvid
);
537 return srvid_deregister(client
->srv
, srvid
, private_data
);