Use a custom init function for samba4 that sets a samba4
[Samba.git] / source4 / cluster / ctdb / server / ctdb_daemon.c
blobf96cd86916bdb7c89f12c35268dca0ba06daddc1
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb.h"
28 #include "../include/ctdb_private.h"
30 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
33 handler for when a node changes its flags
35 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid,
36 TDB_DATA data, void *private_data)
38 struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
40 if (data.dsize != sizeof(*c) || !ctdb_validate_vnn(ctdb, c->vnn)) {
41 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
42 return;
45 if (!ctdb_validate_vnn(ctdb, c->vnn)) {
46 DEBUG(0,("Bad vnn %u in flag_change_handler\n", c->vnn));
47 return;
50 /* don't get the disconnected flag from the other node */
51 ctdb->nodes[c->vnn]->flags =
52 (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED)
53 | (c->flags & ~NODE_FLAGS_DISCONNECTED);
54 DEBUG(2,("Node flags for node %u are now 0x%x\n", c->vnn, ctdb->nodes[c->vnn]->flags));
56 /* make sure we don't hold any IPs when we shouldn't */
57 if (c->vnn == ctdb->vnn &&
58 (ctdb->nodes[c->vnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
59 ctdb_release_all_ips(ctdb);
63 /* called when the "startup" event script has finished */
64 static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
66 if (status != 0) {
67 DEBUG(0,("startup event failed!\n"));
68 ctdb_fatal(ctdb, "startup event script failed");
71 /* start the transport running */
72 if (ctdb->methods->start(ctdb) != 0) {
73 DEBUG(0,("transport failed to start!\n"));
74 ctdb_fatal(ctdb, "transport failed to start");
77 /* start the recovery daemon process */
78 if (ctdb_start_recoverd(ctdb) != 0) {
79 DEBUG(0,("Failed to start recovery daemon\n"));
80 exit(11);
83 /* a handler for when nodes are disabled/enabled */
84 ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED,
85 flag_change_handler, NULL);
87 /* start monitoring for dead nodes */
88 ctdb_start_monitoring(ctdb);
91 /* go into main ctdb loop */
92 static void ctdb_main_loop(struct ctdb_context *ctdb)
94 int ret = -1;
96 if (strcmp(ctdb->transport, "tcp") == 0) {
97 int ctdb_tcp_init(struct ctdb_context *);
98 ret = ctdb_tcp_init(ctdb);
100 #ifdef USE_INFINIBAND
101 if (strcmp(ctdb->transport, "ib") == 0) {
102 int ctdb_ibw_init(struct ctdb_context *);
103 ret = ctdb_ibw_init(ctdb);
105 #endif
106 if (ret != 0) {
107 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
108 return;
111 /* initialise the transport */
112 if (ctdb->methods->initialise(ctdb) != 0) {
113 DEBUG(0,("transport failed to initialise!\n"));
114 ctdb_fatal(ctdb, "transport failed to initialise");
117 /* tell all other nodes we've just started up */
118 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
119 0, CTDB_CONTROL_STARTUP, 0,
120 CTDB_CTRL_FLAG_NOREPLY,
121 tdb_null, NULL, NULL);
123 /* release any IPs we hold from previous runs of the daemon */
124 ctdb_release_all_ips(ctdb);
126 ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb,
127 ctdb_start_transport, NULL, "startup");
128 if (ret != 0) {
129 DEBUG(0,("Failed startup event script\n"));
130 return;
133 /* go into a wait loop to allow other nodes to complete */
134 event_loop_wait(ctdb->ev);
136 DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
137 exit(1);
141 static void block_signal(int signum)
143 struct sigaction act;
145 memset(&act, 0, sizeof(act));
147 act.sa_handler = SIG_IGN;
148 sigemptyset(&act.sa_mask);
149 sigaddset(&act.sa_mask, signum);
150 sigaction(signum, &act, NULL);
155 send a packet to a client
157 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
159 client->ctdb->statistics.client_packets_sent++;
160 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
164 message handler for when we are in daemon mode. This redirects the message
165 to the right client
167 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
168 TDB_DATA data, void *private_data)
170 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
171 struct ctdb_req_message *r;
172 int len;
174 /* construct a message to send to the client containing the data */
175 len = offsetof(struct ctdb_req_message, data) + data.dsize;
176 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
177 len, struct ctdb_req_message);
178 CTDB_NO_MEMORY_VOID(ctdb, r);
180 talloc_set_name_const(r, "req_message packet");
182 r->srvid = srvid;
183 r->datalen = data.dsize;
184 memcpy(&r->data[0], data.dptr, data.dsize);
186 daemon_queue_send(client, &r->hdr);
188 talloc_free(r);
193 this is called when the ctdb daemon received a ctdb request to
194 set the srvid from the client
196 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
199 int res;
200 if (client == NULL) {
201 DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
202 return -1;
204 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
205 if (res != 0) {
206 DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n",
207 (unsigned long long)srvid));
208 } else {
209 DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n",
210 (unsigned long long)srvid));
213 /* this is a hack for Samba - we now know the pid of the Samba client */
214 if ((srvid & 0xFFFFFFFF) == srvid &&
215 kill(srvid, 0) == 0) {
216 client->pid = srvid;
217 DEBUG(0,(__location__ " Registered PID %u for client %u\n",
218 (unsigned)client->pid, client_id));
220 return res;
224 this is called when the ctdb daemon received a ctdb request to
225 remove a srvid from the client
227 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
229 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
230 if (client == NULL) {
231 DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
232 return -1;
234 return ctdb_deregister_message_handler(ctdb, srvid, client);
239 destroy a ctdb_client
241 static int ctdb_client_destructor(struct ctdb_client *client)
243 ctdb_takeover_client_destructor_hook(client);
244 ctdb_reqid_remove(client->ctdb, client->client_id);
245 client->ctdb->statistics.num_clients--;
246 return 0;
251 this is called when the ctdb daemon received a ctdb request message
252 from a local client over the unix domain socket
254 static void daemon_request_message_from_client(struct ctdb_client *client,
255 struct ctdb_req_message *c)
257 TDB_DATA data;
258 int res;
260 /* maybe the message is for another client on this node */
261 if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
262 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
263 return;
266 /* its for a remote node */
267 data.dptr = &c->data[0];
268 data.dsize = c->datalen;
269 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
270 c->srvid, data);
271 if (res != 0) {
272 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
273 c->hdr.destnode));
278 struct daemon_call_state {
279 struct ctdb_client *client;
280 uint32_t reqid;
281 struct ctdb_call *call;
282 struct timeval start_time;
286 complete a call from a client
288 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
290 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
291 struct daemon_call_state);
292 struct ctdb_reply_call *r;
293 int res;
294 uint32_t length;
295 struct ctdb_client *client = dstate->client;
297 talloc_steal(client, dstate);
298 talloc_steal(dstate, dstate->call);
300 res = ctdb_daemon_call_recv(state, dstate->call);
301 if (res != 0) {
302 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
303 client->ctdb->statistics.pending_calls--;
304 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
305 return;
308 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
309 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
310 length, struct ctdb_reply_call);
311 if (r == NULL) {
312 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
313 client->ctdb->statistics.pending_calls--;
314 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
315 return;
317 r->hdr.reqid = dstate->reqid;
318 r->datalen = dstate->call->reply_data.dsize;
319 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
321 res = daemon_queue_send(client, &r->hdr);
322 if (res != 0) {
323 DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n"));
325 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
326 talloc_free(dstate);
327 client->ctdb->statistics.pending_calls--;
331 static void daemon_request_call_from_client(struct ctdb_client *client,
332 struct ctdb_req_call *c);
335 this is called when the ctdb daemon received a ctdb request call
336 from a local client over the unix domain socket
338 static void daemon_request_call_from_client(struct ctdb_client *client,
339 struct ctdb_req_call *c)
341 struct ctdb_call_state *state;
342 struct ctdb_db_context *ctdb_db;
343 struct daemon_call_state *dstate;
344 struct ctdb_call *call;
345 struct ctdb_ltdb_header header;
346 TDB_DATA key, data;
347 int ret;
348 struct ctdb_context *ctdb = client->ctdb;
350 ctdb->statistics.total_calls++;
351 ctdb->statistics.pending_calls++;
353 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
354 if (!ctdb_db) {
355 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
356 c->db_id));
357 ctdb->statistics.pending_calls--;
358 return;
361 key.dptr = c->data;
362 key.dsize = c->keylen;
364 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
365 (struct ctdb_req_header *)c, &data,
366 daemon_incoming_packet, client, True);
367 if (ret == -2) {
368 /* will retry later */
369 ctdb->statistics.pending_calls--;
370 return;
373 if (ret != 0) {
374 DEBUG(0,(__location__ " Unable to fetch record\n"));
375 ctdb->statistics.pending_calls--;
376 return;
379 dstate = talloc(client, struct daemon_call_state);
380 if (dstate == NULL) {
381 ctdb_ltdb_unlock(ctdb_db, key);
382 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
383 ctdb->statistics.pending_calls--;
384 return;
386 dstate->start_time = timeval_current();
387 dstate->client = client;
388 dstate->reqid = c->hdr.reqid;
389 talloc_steal(dstate, data.dptr);
391 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
392 if (call == NULL) {
393 ctdb_ltdb_unlock(ctdb_db, key);
394 DEBUG(0,(__location__ " Unable to allocate call\n"));
395 ctdb->statistics.pending_calls--;
396 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
397 return;
400 call->call_id = c->callid;
401 call->key = key;
402 call->call_data.dptr = c->data + c->keylen;
403 call->call_data.dsize = c->calldatalen;
404 call->flags = c->flags;
406 if (header.dmaster == ctdb->vnn) {
407 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
408 } else {
409 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
412 ctdb_ltdb_unlock(ctdb_db, key);
414 if (state == NULL) {
415 DEBUG(0,(__location__ " Unable to setup call send\n"));
416 ctdb->statistics.pending_calls--;
417 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
418 return;
420 talloc_steal(state, dstate);
421 talloc_steal(client, state);
423 state->async.fn = daemon_call_from_client_callback;
424 state->async.private_data = dstate;
428 static void daemon_request_control_from_client(struct ctdb_client *client,
429 struct ctdb_req_control *c);
431 /* data contains a packet from the client */
432 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
434 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
435 TALLOC_CTX *tmp_ctx;
436 struct ctdb_context *ctdb = client->ctdb;
438 /* place the packet as a child of a tmp_ctx. We then use
439 talloc_free() below to free it. If any of the calls want
440 to keep it, then they will steal it somewhere else, and the
441 talloc_free() will be a no-op */
442 tmp_ctx = talloc_new(client);
443 talloc_steal(tmp_ctx, hdr);
445 if (hdr->ctdb_magic != CTDB_MAGIC) {
446 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
447 goto done;
450 if (hdr->ctdb_version != CTDB_VERSION) {
451 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
452 goto done;
455 switch (hdr->operation) {
456 case CTDB_REQ_CALL:
457 ctdb->statistics.client.req_call++;
458 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
459 break;
461 case CTDB_REQ_MESSAGE:
462 ctdb->statistics.client.req_message++;
463 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
464 break;
466 case CTDB_REQ_CONTROL:
467 ctdb->statistics.client.req_control++;
468 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
469 break;
471 default:
472 DEBUG(0,(__location__ " daemon: unrecognized operation %u\n",
473 hdr->operation));
476 done:
477 talloc_free(tmp_ctx);
481 called when the daemon gets a incoming packet
483 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
485 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
486 struct ctdb_req_header *hdr;
488 if (cnt == 0) {
489 talloc_free(client);
490 return;
493 client->ctdb->statistics.client_packets_recv++;
495 if (cnt < sizeof(*hdr)) {
496 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
497 (unsigned)cnt);
498 return;
500 hdr = (struct ctdb_req_header *)data;
501 if (cnt != hdr->length) {
502 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
503 (unsigned)hdr->length, (unsigned)cnt);
504 return;
507 if (hdr->ctdb_magic != CTDB_MAGIC) {
508 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
509 return;
512 if (hdr->ctdb_version != CTDB_VERSION) {
513 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
514 return;
517 DEBUG(3,(__location__ " client request %u of type %u length %u from "
518 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
519 hdr->srcnode, hdr->destnode));
521 /* it is the responsibility of the incoming packet function to free 'data' */
522 daemon_incoming_packet(client, hdr);
525 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
526 uint16_t flags, void *private_data)
528 struct sockaddr_in addr;
529 socklen_t len;
530 int fd;
531 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
532 struct ctdb_client *client;
534 memset(&addr, 0, sizeof(addr));
535 len = sizeof(addr);
536 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
537 if (fd == -1) {
538 return;
541 set_nonblocking(fd);
542 set_close_on_exec(fd);
544 client = talloc_zero(ctdb, struct ctdb_client);
545 client->ctdb = ctdb;
546 client->fd = fd;
547 client->client_id = ctdb_reqid_new(ctdb, client);
548 ctdb->statistics.num_clients++;
550 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
551 ctdb_daemon_read_cb, client);
553 talloc_set_destructor(client, ctdb_client_destructor);
559 create a unix domain socket and bind it
560 return a file descriptor open on the socket
562 static int ux_socket_bind(struct ctdb_context *ctdb)
564 struct sockaddr_un addr;
566 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
567 if (ctdb->daemon.sd == -1) {
568 return -1;
571 set_nonblocking(ctdb->daemon.sd);
572 set_close_on_exec(ctdb->daemon.sd);
574 #if 0
575 /* AIX doesn't like this :( */
576 if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 ||
577 fchmod(ctdb->daemon.sd, 0700) != 0) {
578 DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
579 goto failed;
581 #endif
583 set_nonblocking(ctdb->daemon.sd);
585 memset(&addr, 0, sizeof(addr));
586 addr.sun_family = AF_UNIX;
587 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
589 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
590 DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
591 goto failed;
593 if (listen(ctdb->daemon.sd, 10) != 0) {
594 DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
595 goto failed;
598 return 0;
600 failed:
601 close(ctdb->daemon.sd);
602 ctdb->daemon.sd = -1;
603 return -1;
607 delete the socket on exit - called on destruction of autofree context
609 static int unlink_destructor(const char *name)
611 unlink(name);
612 return 0;
617 start the protocol going as a daemon
619 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
621 int res;
622 struct fd_event *fde;
623 const char *domain_socket_name;
625 /* get rid of any old sockets */
626 unlink(ctdb->daemon.name);
628 /* create a unix domain stream socket to listen to */
629 res = ux_socket_bind(ctdb);
630 if (res!=0) {
631 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
632 exit(10);
635 if (do_fork && fork()) {
636 return 0;
639 tdb_reopen_all(False);
641 if (do_fork) {
642 setsid();
644 block_signal(SIGPIPE);
646 /* try to set us up as realtime */
647 ctdb_set_realtime(true);
649 /* ensure the socket is deleted on exit of the daemon */
650 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
651 talloc_set_destructor(domain_socket_name, unlink_destructor);
653 ctdb->ev = s4_event_context_init(NULL);
655 /* start frozen, then let the first election sort things out */
656 if (!ctdb_blocking_freeze(ctdb)) {
657 DEBUG(0,("Failed to get initial freeze\n"));
658 exit(12);
661 /* force initial recovery for election */
662 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
664 /* now start accepting clients, only can do this once frozen */
665 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
666 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
667 ctdb_accept_client, ctdb);
669 ctdb_main_loop(ctdb);
671 return 0;
675 allocate a packet for use in daemon<->daemon communication
677 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
678 TALLOC_CTX *mem_ctx,
679 enum ctdb_operation operation,
680 size_t length, size_t slength,
681 const char *type)
683 int size;
684 struct ctdb_req_header *hdr;
686 length = MAX(length, slength);
687 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
689 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
690 if (hdr == NULL) {
691 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
692 operation, (unsigned)length));
693 return NULL;
695 talloc_set_name_const(hdr, type);
696 memset(hdr, 0, slength);
697 hdr->length = length;
698 hdr->operation = operation;
699 hdr->ctdb_magic = CTDB_MAGIC;
700 hdr->ctdb_version = CTDB_VERSION;
701 hdr->generation = ctdb->vnn_map->generation;
702 hdr->srcnode = ctdb->vnn;
704 return hdr;
707 struct daemon_control_state {
708 struct daemon_control_state *next, *prev;
709 struct ctdb_client *client;
710 struct ctdb_req_control *c;
711 uint32_t reqid;
712 struct ctdb_node *node;
716 callback when a control reply comes in
718 static void daemon_control_callback(struct ctdb_context *ctdb,
719 int32_t status, TDB_DATA data,
720 const char *errormsg,
721 void *private_data)
723 struct daemon_control_state *state = talloc_get_type(private_data,
724 struct daemon_control_state);
725 struct ctdb_client *client = state->client;
726 struct ctdb_reply_control *r;
727 size_t len;
729 /* construct a message to send to the client containing the data */
730 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
731 if (errormsg) {
732 len += strlen(errormsg);
734 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
735 struct ctdb_reply_control);
736 CTDB_NO_MEMORY_VOID(ctdb, r);
738 r->hdr.reqid = state->reqid;
739 r->status = status;
740 r->datalen = data.dsize;
741 r->errorlen = 0;
742 memcpy(&r->data[0], data.dptr, data.dsize);
743 if (errormsg) {
744 r->errorlen = strlen(errormsg);
745 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
748 daemon_queue_send(client, &r->hdr);
750 talloc_free(state);
754 fail all pending controls to a disconnected node
756 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
758 struct daemon_control_state *state;
759 while ((state = node->pending_controls)) {
760 DLIST_REMOVE(node->pending_controls, state);
761 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
762 "node is disconnected", state);
767 destroy a daemon_control_state
769 static int daemon_control_destructor(struct daemon_control_state *state)
771 if (state->node) {
772 DLIST_REMOVE(state->node->pending_controls, state);
774 return 0;
778 this is called when the ctdb daemon received a ctdb request control
779 from a local client over the unix domain socket
781 static void daemon_request_control_from_client(struct ctdb_client *client,
782 struct ctdb_req_control *c)
784 TDB_DATA data;
785 int res;
786 struct daemon_control_state *state;
787 TALLOC_CTX *tmp_ctx = talloc_new(client);
789 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
790 c->hdr.destnode = client->ctdb->vnn;
793 state = talloc(client, struct daemon_control_state);
794 CTDB_NO_MEMORY_VOID(client->ctdb, state);
796 state->client = client;
797 state->c = talloc_steal(state, c);
798 state->reqid = c->hdr.reqid;
799 if (ctdb_validate_vnn(client->ctdb, c->hdr.destnode)) {
800 state->node = client->ctdb->nodes[c->hdr.destnode];
801 DLIST_ADD(state->node->pending_controls, state);
802 } else {
803 state->node = NULL;
806 talloc_set_destructor(state, daemon_control_destructor);
808 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
809 talloc_steal(tmp_ctx, state);
812 data.dptr = &c->data[0];
813 data.dsize = c->datalen;
814 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
815 c->srvid, c->opcode, client->client_id,
816 c->flags,
817 data, daemon_control_callback,
818 state);
819 if (res != 0) {
820 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
821 c->hdr.destnode));
824 talloc_free(tmp_ctx);
828 register a call function
830 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
831 ctdb_fn_t fn, int id)
833 struct ctdb_registered_call *call;
834 struct ctdb_db_context *ctdb_db;
836 ctdb_db = find_ctdb_db(ctdb, db_id);
837 if (ctdb_db == NULL) {
838 return -1;
841 call = talloc(ctdb_db, struct ctdb_registered_call);
842 call->fn = fn;
843 call->id = id;
845 DLIST_ADD(ctdb_db->calls, call);
846 return 0;
852 this local messaging handler is ugly, but is needed to prevent
853 recursion in ctdb_send_message() when the destination node is the
854 same as the source node
856 struct ctdb_local_message {
857 struct ctdb_context *ctdb;
858 uint64_t srvid;
859 TDB_DATA data;
862 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
863 struct timeval t, void *private_data)
865 struct ctdb_local_message *m = talloc_get_type(private_data,
866 struct ctdb_local_message);
867 int res;
869 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
870 if (res != 0) {
871 DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n",
872 (unsigned long long)m->srvid));
874 talloc_free(m);
877 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
879 struct ctdb_local_message *m;
880 m = talloc(ctdb, struct ctdb_local_message);
881 CTDB_NO_MEMORY(ctdb, m);
883 m->ctdb = ctdb;
884 m->srvid = srvid;
885 m->data = data;
886 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
887 if (m->data.dptr == NULL) {
888 talloc_free(m);
889 return -1;
892 /* this needs to be done as an event to prevent recursion */
893 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
894 return 0;
898 send a ctdb message
900 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
901 uint64_t srvid, TDB_DATA data)
903 struct ctdb_req_message *r;
904 int len;
906 /* see if this is a message to ourselves */
907 if (vnn == ctdb->vnn) {
908 return ctdb_local_message(ctdb, srvid, data);
911 len = offsetof(struct ctdb_req_message, data) + data.dsize;
912 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
913 struct ctdb_req_message);
914 CTDB_NO_MEMORY(ctdb, r);
916 r->hdr.destnode = vnn;
917 r->srvid = srvid;
918 r->datalen = data.dsize;
919 memcpy(&r->data[0], data.dptr, data.dsize);
921 ctdb_queue_packet(ctdb, &r->hdr);
923 talloc_free(r);
924 return 0;