ctdb-daemon: Schedule running of callback if there are no event scripts
[Samba.git] / ctdb / client / client_message.c
blobd35ee4c8925b0ab37d11ccd3d05efbca47e54bbe
1 /*
2 CTDB client code
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
28 #include "lib/util/tevent_unix.h"
30 #include "common/reqid.h"
31 #include "common/srvid.h"
32 #include "common/comm.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
42 * Handle REQ_MESSAGE
45 struct ctdb_client_message_state {
46 struct ctdb_client_context *client;
47 uint32_t reqid;
50 static int ctdb_client_message_state_destructor(
51 struct ctdb_client_message_state *state);
52 static void ctdb_client_message_done(struct tevent_req *subreq);
54 struct tevent_req *ctdb_client_message_send(TALLOC_CTX *mem_ctx,
55 struct tevent_context *ev,
56 struct ctdb_client_context *client,
57 uint32_t destnode,
58 struct ctdb_req_message *message)
60 struct tevent_req *req, *subreq;
61 struct ctdb_client_message_state *state;
62 struct ctdb_req_header h;
63 uint32_t reqid;
64 uint8_t *buf;
65 size_t datalen, buflen;
66 int ret;
68 req = tevent_req_create(mem_ctx, &state,
69 struct ctdb_client_message_state);
70 if (req == NULL) {
71 return NULL;
74 reqid = reqid_new(client->idr, state);
75 if (reqid == REQID_INVALID) {
76 talloc_free(req);
77 return NULL;
80 state->client = client;
81 state->reqid = reqid;
83 talloc_set_destructor(state, ctdb_client_message_state_destructor);
85 ctdb_req_header_fill(&h, 0, CTDB_REQ_MESSAGE, destnode,
86 client->pnn, reqid);
88 datalen = ctdb_req_message_len(&h, message);
89 ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
90 if (ret != 0) {
91 tevent_req_error(req, ret);
92 return tevent_req_post(req, ev);
95 ret = ctdb_req_message_push(&h, message, buf, &buflen);
96 if (ret != 0) {
97 tevent_req_error(req, ret);
98 return tevent_req_post(req, ev);
101 subreq = comm_write_send(state, ev, client->comm, buf, buflen);
102 if (tevent_req_nomem(subreq, req)) {
103 return tevent_req_post(req, ev);
105 tevent_req_set_callback(subreq, ctdb_client_message_done, req);
107 return req;
110 static int ctdb_client_message_state_destructor(
111 struct ctdb_client_message_state *state)
113 reqid_remove(state->client->idr, state->reqid);
114 return 0;
117 static void ctdb_client_message_done(struct tevent_req *subreq)
119 struct tevent_req *req = tevent_req_callback_data(
120 subreq, struct tevent_req);
121 int ret;
122 bool status;
124 status = comm_write_recv(subreq, &ret);
125 TALLOC_FREE(subreq);
126 if (! status) {
127 tevent_req_error(req, ret);
128 return;
131 tevent_req_done(req);
134 bool ctdb_client_message_recv(struct tevent_req *req, int *perr)
136 int err;
138 if (tevent_req_is_unix_error(req, &err)) {
139 if (perr != NULL) {
140 *perr = err;
142 return false;
145 return true;
148 void ctdb_client_req_message(struct ctdb_client_context *client,
149 uint8_t *buf, size_t buflen, uint32_t reqid)
151 struct ctdb_req_header h;
152 struct ctdb_req_message_data message;
153 TALLOC_CTX *tmp_ctx = talloc_new(client);
154 int ret;
156 ret = ctdb_req_message_data_pull(buf, buflen, &h, tmp_ctx, &message);
157 if (ret != 0) {
158 return;
161 srvid_dispatch(client->srv, message.srvid, CTDB_SRVID_ALL,
162 message.data);
163 talloc_free(tmp_ctx);
167 * Handle multiple nodes
170 struct ctdb_client_message_multi_state {
171 uint32_t *pnn_list;
172 int count;
173 int done;
174 int err;
175 int *err_list;
178 struct message_index_state {
179 struct tevent_req *req;
180 int index;
183 static void ctdb_client_message_multi_done(struct tevent_req *subreq);
185 struct tevent_req *ctdb_client_message_multi_send(
186 TALLOC_CTX *mem_ctx,
187 struct tevent_context *ev,
188 struct ctdb_client_context *client,
189 uint32_t *pnn_list, int count,
190 struct ctdb_req_message *message)
192 struct tevent_req *req, *subreq;
193 struct ctdb_client_message_multi_state *state;
194 int i;
196 if (pnn_list == NULL || count == 0) {
197 return NULL;
200 req = tevent_req_create(mem_ctx, &state,
201 struct ctdb_client_message_multi_state);
202 if (req == NULL) {
203 return NULL;
206 state->pnn_list = pnn_list;
207 state->count = count;
208 state->done = 0;
209 state->err = 0;
210 state->err_list = talloc_zero_array(state, int, count);
211 if (tevent_req_nomem(state->err_list, req)) {
212 return tevent_req_post(req, ev);
215 for (i=0; i<count; i++) {
216 struct message_index_state *substate;
218 subreq = ctdb_client_message_send(state, ev, client,
219 pnn_list[i], message);
220 if (tevent_req_nomem(subreq, req)) {
221 return tevent_req_post(req, ev);
224 substate = talloc(subreq, struct message_index_state);
225 if (tevent_req_nomem(substate, req)) {
226 return tevent_req_post(req, ev);
229 substate->req = req;
230 substate->index = i;
232 tevent_req_set_callback(subreq, ctdb_client_message_multi_done,
233 substate);
236 return req;
239 static void ctdb_client_message_multi_done(struct tevent_req *subreq)
241 struct message_index_state *substate = tevent_req_callback_data(
242 subreq, struct message_index_state);
243 struct tevent_req *req = substate->req;
244 int idx = substate->index;
245 struct ctdb_client_message_multi_state *state = tevent_req_data(
246 req, struct ctdb_client_message_multi_state);
247 bool status;
248 int ret;
250 status = ctdb_client_message_recv(subreq, &ret);
251 TALLOC_FREE(subreq);
252 if (! status) {
253 if (state->err == 0) {
254 state->err = ret;
255 state->err_list[idx] = state->err;
259 state->done += 1;
261 if (state->done == state->count) {
262 tevent_req_done(req);
266 bool ctdb_client_message_multi_recv(struct tevent_req *req, int *perr,
267 TALLOC_CTX *mem_ctx, int **perr_list)
269 struct ctdb_client_message_multi_state *state = tevent_req_data(
270 req, struct ctdb_client_message_multi_state);
271 int err;
273 if (tevent_req_is_unix_error(req, &err)) {
274 if (perr != NULL) {
275 *perr = err;
277 if (perr_list != NULL) {
278 *perr_list = talloc_steal(mem_ctx, state->err_list);
280 return false;
283 if (perr != NULL) {
284 *perr = state->err;
287 if (perr_list != NULL) {
288 *perr_list = talloc_steal(mem_ctx, state->err_list);
291 if (state->err != 0) {
292 return false;
295 return true;
299 * sync version of message send
302 int ctdb_client_message(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
303 struct ctdb_client_context *client,
304 uint32_t destnode, struct ctdb_req_message *message)
306 TALLOC_CTX *tmp_ctx;
307 struct tevent_req *req;
308 int ret;
309 bool status;
311 tmp_ctx = talloc_new(client);
312 if (tmp_ctx == NULL) {
313 return ENOMEM;
316 req = ctdb_client_message_send(tmp_ctx, ev, client, destnode, message);
317 if (req == NULL) {
318 talloc_free(tmp_ctx);
319 return ENOMEM;
322 tevent_req_poll(req, ev);
324 status = ctdb_client_message_recv(req, &ret);
325 if (! status) {
326 talloc_free(tmp_ctx);
327 return ret;
330 talloc_free(tmp_ctx);
331 return 0;
334 int ctdb_client_message_multi(TALLOC_CTX *mem_ctx,
335 struct tevent_context *ev,
336 struct ctdb_client_context *client,
337 uint32_t *pnn_list, int count,
338 struct ctdb_req_message *message,
339 int **perr_list)
341 struct tevent_req *req;
342 bool status;
343 int ret;
345 req = ctdb_client_message_multi_send(mem_ctx, ev, client,
346 pnn_list, count,
347 message);
348 if (req == NULL) {
349 return ENOMEM;
352 tevent_req_poll(req, ev);
354 status = ctdb_client_message_multi_recv(req, &ret, mem_ctx, perr_list);
355 if (! status) {
356 return ret;
359 return 0;
362 struct ctdb_client_set_message_handler_state {
363 struct ctdb_client_context *client;
364 uint64_t srvid;
365 srvid_handler_fn handler;
366 void *private_data;
369 static void ctdb_client_set_message_handler_done(struct tevent_req *subreq);
371 struct tevent_req *ctdb_client_set_message_handler_send(
372 TALLOC_CTX *mem_ctx,
373 struct tevent_context *ev,
374 struct ctdb_client_context *client,
375 uint64_t srvid,
376 srvid_handler_fn handler,
377 void *private_data)
379 struct tevent_req *req, *subreq;
380 struct ctdb_client_set_message_handler_state *state;
381 struct ctdb_req_control request;
383 req = tevent_req_create(mem_ctx, &state,
384 struct ctdb_client_set_message_handler_state);
385 if (req == NULL) {
386 return NULL;
389 state->client = client;
390 state->srvid = srvid;
391 state->handler = handler;
392 state->private_data = private_data;
394 ctdb_req_control_register_srvid(&request, srvid);
395 subreq = ctdb_client_control_send(state, ev, client, client->pnn,
396 tevent_timeval_zero(), &request);
397 if (tevent_req_nomem(subreq, req)) {
398 return tevent_req_post(req, ev);
400 tevent_req_set_callback(subreq, ctdb_client_set_message_handler_done,
401 req);
403 return req;
406 static void ctdb_client_set_message_handler_done(struct tevent_req *subreq)
408 struct tevent_req *req = tevent_req_callback_data(
409 subreq, struct tevent_req);
410 struct ctdb_client_set_message_handler_state *state = tevent_req_data(
411 req, struct ctdb_client_set_message_handler_state);
412 struct ctdb_reply_control *reply;
413 bool status;
414 int ret;
416 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
417 TALLOC_FREE(subreq);
418 if (! status) {
419 tevent_req_error(req, ret);
420 return;
423 ret = ctdb_reply_control_register_srvid(reply);
424 talloc_free(reply);
425 if (ret != 0) {
426 tevent_req_error(req, ret);
427 return;
430 ret = srvid_register(state->client->srv, state->client, state->srvid,
431 state->handler, state->private_data);
432 if (ret != 0) {
433 tevent_req_error(req, ret);
434 return;
437 tevent_req_done(req);
440 bool ctdb_client_set_message_handler_recv(struct tevent_req *req, int *perr)
442 int err;
444 if (tevent_req_is_unix_error(req, &err)) {
445 if (perr != NULL) {
446 *perr = err;
448 return false;
450 return true;
453 struct ctdb_client_remove_message_handler_state {
454 struct ctdb_client_context *client;
455 uint64_t srvid;
456 void *private_data;
459 static void ctdb_client_remove_message_handler_done(struct tevent_req *subreq);
461 struct tevent_req *ctdb_client_remove_message_handler_send(
462 TALLOC_CTX *mem_ctx,
463 struct tevent_context *ev,
464 struct ctdb_client_context *client,
465 uint64_t srvid,
466 void *private_data)
468 struct tevent_req *req, *subreq;
469 struct ctdb_client_remove_message_handler_state *state;
470 struct ctdb_req_control request;
472 req = tevent_req_create(mem_ctx, &state,
473 struct ctdb_client_remove_message_handler_state);
474 if (req == NULL) {
475 return NULL;
478 state->client = client;
479 state->srvid = srvid;
480 state->private_data = private_data;
482 ctdb_req_control_deregister_srvid(&request, srvid);
483 subreq = ctdb_client_control_send(state, ev, client, client->pnn,
484 tevent_timeval_zero(), &request);
485 if (tevent_req_nomem(subreq, req)) {
486 return tevent_req_post(req, ev);
488 tevent_req_set_callback(subreq,
489 ctdb_client_remove_message_handler_done, req);
491 return req;
494 static void ctdb_client_remove_message_handler_done(struct tevent_req *subreq)
496 struct tevent_req *req = tevent_req_callback_data(
497 subreq, struct tevent_req);
498 struct ctdb_client_remove_message_handler_state *state = tevent_req_data(
499 req, struct ctdb_client_remove_message_handler_state);
500 struct ctdb_reply_control *reply;
501 bool status;
502 int ret;
504 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
505 TALLOC_FREE(subreq);
506 if (! status) {
507 tevent_req_error(req, ret);
508 return;
511 ret = ctdb_reply_control_deregister_srvid(reply);
512 talloc_free(reply);
513 if (ret != 0) {
514 tevent_req_error(req, ret);
515 return;
518 ret = srvid_deregister(state->client->srv, state->srvid,
519 state->private_data);
520 if (ret != 0) {
521 tevent_req_error(req, ret);
522 return;
525 tevent_req_done(req);
528 bool ctdb_client_remove_message_handler_recv(struct tevent_req *req, int *perr)
530 int err;
532 if (tevent_req_is_unix_error(req, &err)) {
533 if (perr != NULL) {
534 *perr = err;
536 return false;
538 return true;
541 int ctdb_client_set_message_handler(struct tevent_context *ev,
542 struct ctdb_client_context *client,
543 uint64_t srvid, srvid_handler_fn handler,
544 void *private_data)
546 TALLOC_CTX *mem_ctx;
547 int ret;
549 mem_ctx = talloc_new(client);
550 if (mem_ctx == NULL) {
551 return ENOMEM;
554 ret = ctdb_ctrl_register_srvid(mem_ctx, ev, client, client->pnn,
555 tevent_timeval_zero(), srvid);
556 talloc_free(mem_ctx);
557 if (ret != 0) {
558 return ret;
561 return srvid_register(client->srv, client, srvid,
562 handler, private_data);
565 int ctdb_client_remove_message_handler(struct tevent_context *ev,
566 struct ctdb_client_context *client,
567 uint64_t srvid, void *private_data)
569 TALLOC_CTX *mem_ctx;
570 int ret;
572 mem_ctx = talloc_new(client);
573 if (mem_ctx == NULL) {
574 return ENOMEM;
577 ret = ctdb_ctrl_deregister_srvid(mem_ctx, ev, client, client->pnn,
578 tevent_timeval_zero(), srvid);
579 talloc_free(mem_ctx);
580 if (ret != 0) {
581 return ret;
584 return srvid_deregister(client->srv, srvid, private_data);