ctdb-daemon: Move some inline declarations to header file
[Samba.git] / ctdb / server / ctdb_daemon.c
blobbf8b82dd702ecfaa6d63d9d8ae9499478a283c8b
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
33 struct ctdb_client_pid_list {
34 struct ctdb_client_pid_list *next, *prev;
35 struct ctdb_context *ctdb;
36 pid_t pid;
37 struct ctdb_client *client;
40 const char *ctdbd_pidfile = NULL;
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
44 static void print_exit_message(void)
46 if (debug_extra != NULL && debug_extra[0] != '\0') {
47 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48 } else {
49 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
51 /* Wait a second to allow pending log messages to be flushed */
52 sleep(1);
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
59 struct timeval t, void *private_data)
61 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
63 if (getpid() != ctdb->ctdbd_pid) {
64 return;
67 event_add_timed(ctdb->ev, ctdb,
68 timeval_current_ofs(1, 0),
69 ctdb_time_tick, ctdb);
72 /* Used to trigger a dummy event once per second, to make
73 * detection of hangs more reliable.
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
77 event_add_timed(ctdb->ev, ctdb,
78 timeval_current_ofs(1, 0),
79 ctdb_time_tick, ctdb);
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
84 /* start monitoring for connected/disconnected nodes */
85 ctdb_start_keepalive(ctdb);
87 /* start periodic update of tcp tickle lists */
88 ctdb_start_tcp_tickle_update(ctdb);
90 /* start listening for recovery daemon pings */
91 ctdb_control_recd_ping(ctdb);
93 /* start listening to timer ticks */
94 ctdb_start_time_tickd(ctdb);
97 static void ignore_signal(int signum)
99 struct sigaction act;
101 memset(&act, 0, sizeof(act));
103 act.sa_handler = SIG_IGN;
104 sigemptyset(&act.sa_mask);
105 sigaddset(&act.sa_mask, signum);
106 sigaction(signum, &act, NULL);
111 send a packet to a client
113 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
115 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
116 if (hdr->operation == CTDB_REQ_MESSAGE) {
117 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
118 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
119 talloc_free(client);
120 return -1;
123 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
127 message handler for when we are in daemon mode. This redirects the message
128 to the right client
130 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
131 TDB_DATA data, void *private_data)
133 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
134 struct ctdb_req_message *r;
135 int len;
137 /* construct a message to send to the client containing the data */
138 len = offsetof(struct ctdb_req_message, data) + data.dsize;
139 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
140 len, struct ctdb_req_message);
141 CTDB_NO_MEMORY_VOID(ctdb, r);
143 talloc_set_name_const(r, "req_message packet");
145 r->srvid = srvid;
146 r->datalen = data.dsize;
147 memcpy(&r->data[0], data.dptr, data.dsize);
149 daemon_queue_send(client, &r->hdr);
151 talloc_free(r);
155 this is called when the ctdb daemon received a ctdb request to
156 set the srvid from the client
158 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
160 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
161 int res;
162 if (client == NULL) {
163 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
164 return -1;
166 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
167 if (res != 0) {
168 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
169 (unsigned long long)srvid));
170 } else {
171 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
172 (unsigned long long)srvid));
175 return res;
179 this is called when the ctdb daemon received a ctdb request to
180 remove a srvid from the client
182 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
184 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
185 if (client == NULL) {
186 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
187 return -1;
189 return ctdb_deregister_message_handler(ctdb, srvid, client);
192 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
193 TDB_DATA *outdata)
195 uint64_t *ids;
196 int i, num_ids;
197 uint8_t *results;
199 if ((indata.dsize % sizeof(uint64_t)) != 0) {
200 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
201 "size=%d\n", (int)indata.dsize));
202 return -1;
205 ids = (uint64_t *)indata.dptr;
206 num_ids = indata.dsize / 8;
208 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
209 if (results == NULL) {
210 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
211 return -1;
213 for (i=0; i<num_ids; i++) {
214 if (ctdb_check_message_handler(ctdb, ids[i])) {
215 results[i/8] |= (1 << (i%8));
218 outdata->dptr = (uint8_t *)results;
219 outdata->dsize = talloc_get_size(results);
220 return 0;
224 destroy a ctdb_client
226 static int ctdb_client_destructor(struct ctdb_client *client)
228 struct ctdb_db_context *ctdb_db;
230 ctdb_takeover_client_destructor_hook(client);
231 ctdb_reqid_remove(client->ctdb, client->client_id);
232 client->ctdb->num_clients--;
234 if (client->num_persistent_updates != 0) {
235 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
236 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
238 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
239 if (ctdb_db) {
240 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
241 "commit active. Forcing recovery.\n"));
242 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
245 * trans3 transaction state:
247 * The destructor sets the pointer to NULL.
249 talloc_free(ctdb_db->persistent_state);
252 return 0;
257 this is called when the ctdb daemon received a ctdb request message
258 from a local client over the unix domain socket
260 static void daemon_request_message_from_client(struct ctdb_client *client,
261 struct ctdb_req_message *c)
263 TDB_DATA data;
264 int res;
266 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
267 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
270 /* maybe the message is for another client on this node */
271 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
272 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
273 return;
276 /* its for a remote node */
277 data.dptr = &c->data[0];
278 data.dsize = c->datalen;
279 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
280 c->srvid, data);
281 if (res != 0) {
282 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
283 c->hdr.destnode));
288 struct daemon_call_state {
289 struct ctdb_client *client;
290 uint32_t reqid;
291 struct ctdb_call *call;
292 struct timeval start_time;
294 /* readonly request ? */
295 uint32_t readonly_fetch;
296 uint32_t client_callid;
300 complete a call from a client
302 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
304 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
305 struct daemon_call_state);
306 struct ctdb_reply_call *r;
307 int res;
308 uint32_t length;
309 struct ctdb_client *client = dstate->client;
310 struct ctdb_db_context *ctdb_db = state->ctdb_db;
312 talloc_steal(client, dstate);
313 talloc_steal(dstate, dstate->call);
315 res = ctdb_daemon_call_recv(state, dstate->call);
316 if (res != 0) {
317 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
318 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
320 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
321 return;
324 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
325 /* If the client asked for readonly FETCH, we remapped this to
326 FETCH_WITH_HEADER when calling the daemon. So we must
327 strip the extra header off the reply data before passing
328 it back to the client.
330 if (dstate->readonly_fetch
331 && dstate->client_callid == CTDB_FETCH_FUNC) {
332 length -= sizeof(struct ctdb_ltdb_header);
335 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
336 length, struct ctdb_reply_call);
337 if (r == NULL) {
338 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
339 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
340 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
341 return;
343 r->hdr.reqid = dstate->reqid;
344 r->status = dstate->call->status;
346 if (dstate->readonly_fetch
347 && dstate->client_callid == CTDB_FETCH_FUNC) {
348 /* client only asked for a FETCH so we must strip off
349 the extra ctdb_ltdb header
351 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
352 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
353 } else {
354 r->datalen = dstate->call->reply_data.dsize;
355 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
358 res = daemon_queue_send(client, &r->hdr);
359 if (res == -1) {
360 /* client is dead - return immediately */
361 return;
363 if (res != 0) {
364 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
366 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
367 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
368 talloc_free(dstate);
371 struct ctdb_daemon_packet_wrap {
372 struct ctdb_context *ctdb;
373 uint32_t client_id;
377 a wrapper to catch disconnected clients
379 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
381 struct ctdb_client *client;
382 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
383 struct ctdb_daemon_packet_wrap);
384 if (w == NULL) {
385 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
386 return;
389 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
390 if (client == NULL) {
391 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
392 w->client_id));
393 talloc_free(w);
394 return;
396 talloc_free(w);
398 /* process it */
399 daemon_incoming_packet(client, hdr);
402 struct ctdb_deferred_fetch_call {
403 struct ctdb_deferred_fetch_call *next, *prev;
404 struct ctdb_req_call *c;
405 struct ctdb_daemon_packet_wrap *w;
408 struct ctdb_deferred_fetch_queue {
409 struct ctdb_deferred_fetch_call *deferred_calls;
412 struct ctdb_deferred_requeue {
413 struct ctdb_deferred_fetch_call *dfc;
414 struct ctdb_client *client;
417 /* called from a timer event and starts reprocessing the deferred call.*/
418 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
419 struct timeval t, void *private_data)
421 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
422 struct ctdb_client *client = dfr->client;
424 talloc_steal(client, dfr->dfc->c);
425 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
426 talloc_free(dfr);
429 /* the referral context is destroyed either after a timeout or when the initial
430 fetch-lock has finished.
431 at this stage, immediately start reprocessing the queued up deferred
432 calls so they get reprocessed immediately (and since we are dmaster at
433 this stage, trigger the waiting smbd processes to pick up and aquire the
434 record right away.
436 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
439 /* need to reprocess the packets from the queue explicitely instead of
440 just using a normal destructor since we want, need, to
441 call the clients in the same oder as the requests queued up
443 while (dfq->deferred_calls != NULL) {
444 struct ctdb_client *client;
445 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
446 struct ctdb_deferred_requeue *dfr;
448 DLIST_REMOVE(dfq->deferred_calls, dfc);
450 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
451 if (client == NULL) {
452 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
453 dfc->w->client_id));
454 continue;
457 /* process it by pushing it back onto the eventloop */
458 dfr = talloc(client, struct ctdb_deferred_requeue);
459 if (dfr == NULL) {
460 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
461 continue;
464 dfr->dfc = talloc_steal(dfr, dfc);
465 dfr->client = client;
467 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
470 return 0;
473 /* insert the new deferral context into the rb tree.
474 there should never be a pre-existing context here, but check for it
475 warn and destroy the previous context if there is already a deferral context
476 for this key.
478 static void *insert_dfq_callback(void *parm, void *data)
480 if (data) {
481 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
482 talloc_free(data);
484 return parm;
487 /* if the original fetch-lock did not complete within a reasonable time,
488 free the context and context for all deferred requests to cause them to be
489 re-inserted into the event system.
491 static void dfq_timeout(struct event_context *ev, struct timed_event *te,
492 struct timeval t, void *private_data)
494 talloc_free(private_data);
497 /* This function is used in the local daemon to register a KEY in a database
498 for being "fetched"
499 While the remote fetch is in-flight, any futher attempts to re-fetch the
500 same record will be deferred until the fetch completes.
502 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
504 uint32_t *k;
505 struct ctdb_deferred_fetch_queue *dfq;
507 k = ctdb_key_to_idkey(call, call->key);
508 if (k == NULL) {
509 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
510 return -1;
513 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
514 if (dfq == NULL) {
515 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
516 talloc_free(k);
517 return -1;
519 dfq->deferred_calls = NULL;
521 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
523 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
525 /* if the fetch havent completed in 30 seconds, just tear it all down
526 and let it try again as the events are reissued */
527 event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
529 talloc_free(k);
530 return 0;
533 /* check if this is a duplicate request to a fetch already in-flight
534 if it is, make this call deferred to be reprocessed later when
535 the in-flight fetch completes.
537 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
539 uint32_t *k;
540 struct ctdb_deferred_fetch_queue *dfq;
541 struct ctdb_deferred_fetch_call *dfc;
543 k = ctdb_key_to_idkey(c, key);
544 if (k == NULL) {
545 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
546 return -1;
549 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
550 if (dfq == NULL) {
551 talloc_free(k);
552 return -1;
556 talloc_free(k);
558 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
559 if (dfc == NULL) {
560 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
561 return -1;
564 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
565 if (dfc->w == NULL) {
566 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
567 talloc_free(dfc);
568 return -1;
571 dfc->c = talloc_steal(dfc, c);
572 dfc->w->ctdb = ctdb_db->ctdb;
573 dfc->w->client_id = client->client_id;
575 DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
577 return 0;
582 this is called when the ctdb daemon received a ctdb request call
583 from a local client over the unix domain socket
585 static void daemon_request_call_from_client(struct ctdb_client *client,
586 struct ctdb_req_call *c)
588 struct ctdb_call_state *state;
589 struct ctdb_db_context *ctdb_db;
590 struct daemon_call_state *dstate;
591 struct ctdb_call *call;
592 struct ctdb_ltdb_header header;
593 TDB_DATA key, data;
594 int ret;
595 struct ctdb_context *ctdb = client->ctdb;
596 struct ctdb_daemon_packet_wrap *w;
598 CTDB_INCREMENT_STAT(ctdb, total_calls);
599 CTDB_DECREMENT_STAT(ctdb, pending_calls);
601 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
602 if (!ctdb_db) {
603 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
604 c->db_id));
605 CTDB_DECREMENT_STAT(ctdb, pending_calls);
606 return;
609 if (ctdb_db->unhealthy_reason) {
611 * this is just a warning, as the tdb should be empty anyway,
612 * and only persistent databases can be unhealthy, which doesn't
613 * use this code patch
615 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
616 ctdb_db->db_name, ctdb_db->unhealthy_reason));
619 key.dptr = c->data;
620 key.dsize = c->keylen;
622 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
623 CTDB_NO_MEMORY_VOID(ctdb, w);
625 w->ctdb = ctdb;
626 w->client_id = client->client_id;
628 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
629 (struct ctdb_req_header *)c, &data,
630 daemon_incoming_packet_wrap, w, true);
631 if (ret == -2) {
632 /* will retry later */
633 CTDB_DECREMENT_STAT(ctdb, pending_calls);
634 return;
637 talloc_free(w);
639 if (ret != 0) {
640 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
641 CTDB_DECREMENT_STAT(ctdb, pending_calls);
642 return;
646 /* check if this fetch request is a duplicate for a
647 request we already have in flight. If so defer it until
648 the first request completes.
650 if (ctdb->tunable.fetch_collapse == 1) {
651 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
652 ret = ctdb_ltdb_unlock(ctdb_db, key);
653 if (ret != 0) {
654 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
656 return;
660 /* Dont do READONLY if we dont have a tracking database */
661 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
662 c->flags &= ~CTDB_WANT_READONLY;
665 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
666 header.flags &= ~CTDB_REC_RO_FLAGS;
667 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
668 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
669 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
670 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
672 /* and clear out the tracking data */
673 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
674 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
678 /* if we are revoking, we must defer all other calls until the revoke
679 * had completed.
681 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
682 talloc_free(data.dptr);
683 ret = ctdb_ltdb_unlock(ctdb_db, key);
685 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
686 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
688 return;
691 if ((header.dmaster == ctdb->pnn)
692 && (!(c->flags & CTDB_WANT_READONLY))
693 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
694 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
695 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
696 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
698 ret = ctdb_ltdb_unlock(ctdb_db, key);
700 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
701 ctdb_fatal(ctdb, "Failed to start record revoke");
703 talloc_free(data.dptr);
705 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
706 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
709 return;
712 dstate = talloc(client, struct daemon_call_state);
713 if (dstate == NULL) {
714 ret = ctdb_ltdb_unlock(ctdb_db, key);
715 if (ret != 0) {
716 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
719 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
720 CTDB_DECREMENT_STAT(ctdb, pending_calls);
721 return;
723 dstate->start_time = timeval_current();
724 dstate->client = client;
725 dstate->reqid = c->hdr.reqid;
726 talloc_steal(dstate, data.dptr);
728 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
729 if (call == NULL) {
730 ret = ctdb_ltdb_unlock(ctdb_db, key);
731 if (ret != 0) {
732 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
735 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
736 CTDB_DECREMENT_STAT(ctdb, pending_calls);
737 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
738 return;
741 dstate->readonly_fetch = 0;
742 call->call_id = c->callid;
743 call->key = key;
744 call->call_data.dptr = c->data + c->keylen;
745 call->call_data.dsize = c->calldatalen;
746 call->flags = c->flags;
748 if (c->flags & CTDB_WANT_READONLY) {
749 /* client wants readonly record, so translate this into a
750 fetch with header. remember what the client asked for
751 so we can remap the reply back to the proper format for
752 the client in the reply
754 dstate->client_callid = call->call_id;
755 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
756 dstate->readonly_fetch = 1;
759 if (header.dmaster == ctdb->pnn) {
760 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
761 } else {
762 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
763 if (ctdb->tunable.fetch_collapse == 1) {
764 /* This request triggered a remote fetch-lock.
765 set up a deferral for this key so any additional
766 fetch-locks are deferred until the current one
767 finishes.
769 setup_deferred_fetch_locks(ctdb_db, call);
773 ret = ctdb_ltdb_unlock(ctdb_db, key);
774 if (ret != 0) {
775 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
778 if (state == NULL) {
779 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
780 CTDB_DECREMENT_STAT(ctdb, pending_calls);
781 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
782 return;
784 talloc_steal(state, dstate);
785 talloc_steal(client, state);
787 state->async.fn = daemon_call_from_client_callback;
788 state->async.private_data = dstate;
792 static void daemon_request_control_from_client(struct ctdb_client *client,
793 struct ctdb_req_control *c);
795 /* data contains a packet from the client */
796 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
798 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
799 TALLOC_CTX *tmp_ctx;
800 struct ctdb_context *ctdb = client->ctdb;
802 /* place the packet as a child of a tmp_ctx. We then use
803 talloc_free() below to free it. If any of the calls want
804 to keep it, then they will steal it somewhere else, and the
805 talloc_free() will be a no-op */
806 tmp_ctx = talloc_new(client);
807 talloc_steal(tmp_ctx, hdr);
809 if (hdr->ctdb_magic != CTDB_MAGIC) {
810 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
811 goto done;
814 if (hdr->ctdb_version != CTDB_VERSION) {
815 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
816 goto done;
819 switch (hdr->operation) {
820 case CTDB_REQ_CALL:
821 CTDB_INCREMENT_STAT(ctdb, client.req_call);
822 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
823 break;
825 case CTDB_REQ_MESSAGE:
826 CTDB_INCREMENT_STAT(ctdb, client.req_message);
827 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
828 break;
830 case CTDB_REQ_CONTROL:
831 CTDB_INCREMENT_STAT(ctdb, client.req_control);
832 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
833 break;
835 default:
836 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
837 hdr->operation));
840 done:
841 talloc_free(tmp_ctx);
845 called when the daemon gets a incoming packet
847 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
849 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
850 struct ctdb_req_header *hdr;
852 if (cnt == 0) {
853 talloc_free(client);
854 return;
857 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
859 if (cnt < sizeof(*hdr)) {
860 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
861 (unsigned)cnt);
862 return;
864 hdr = (struct ctdb_req_header *)data;
865 if (cnt != hdr->length) {
866 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
867 (unsigned)hdr->length, (unsigned)cnt);
868 return;
871 if (hdr->ctdb_magic != CTDB_MAGIC) {
872 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
873 return;
876 if (hdr->ctdb_version != CTDB_VERSION) {
877 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
878 return;
881 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
882 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
883 hdr->srcnode, hdr->destnode));
885 /* it is the responsibility of the incoming packet function to free 'data' */
886 daemon_incoming_packet(client, hdr);
890 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
892 if (client_pid->ctdb->client_pids != NULL) {
893 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
896 return 0;
900 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
901 uint16_t flags, void *private_data)
903 struct sockaddr_un addr;
904 socklen_t len;
905 int fd;
906 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
907 struct ctdb_client *client;
908 struct ctdb_client_pid_list *client_pid;
909 pid_t peer_pid = 0;
911 memset(&addr, 0, sizeof(addr));
912 len = sizeof(addr);
913 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
914 if (fd == -1) {
915 return;
918 set_nonblocking(fd);
919 set_close_on_exec(fd);
921 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
923 client = talloc_zero(ctdb, struct ctdb_client);
924 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
925 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
928 client->ctdb = ctdb;
929 client->fd = fd;
930 client->client_id = ctdb_reqid_new(ctdb, client);
931 client->pid = peer_pid;
933 client_pid = talloc(client, struct ctdb_client_pid_list);
934 if (client_pid == NULL) {
935 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
936 close(fd);
937 talloc_free(client);
938 return;
940 client_pid->ctdb = ctdb;
941 client_pid->pid = peer_pid;
942 client_pid->client = client;
944 DLIST_ADD(ctdb->client_pids, client_pid);
946 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
947 ctdb_daemon_read_cb, client,
948 "client-%u", client->pid);
950 talloc_set_destructor(client, ctdb_client_destructor);
951 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
952 ctdb->num_clients++;
958 create a unix domain socket and bind it
959 return a file descriptor open on the socket
961 static int ux_socket_bind(struct ctdb_context *ctdb)
963 struct sockaddr_un addr;
965 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
966 if (ctdb->daemon.sd == -1) {
967 return -1;
970 memset(&addr, 0, sizeof(addr));
971 addr.sun_family = AF_UNIX;
972 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
974 /* First check if an old ctdbd might be running */
975 if (connect(ctdb->daemon.sd,
976 (struct sockaddr *)&addr, sizeof(addr)) == 0) {
977 DEBUG(DEBUG_CRIT,
978 ("Something is already listening on ctdb socket '%s'\n",
979 ctdb->daemon.name));
980 goto failed;
983 /* Remove any old socket */
984 unlink(ctdb->daemon.name);
986 set_close_on_exec(ctdb->daemon.sd);
987 set_nonblocking(ctdb->daemon.sd);
989 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
990 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
991 goto failed;
994 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
995 chmod(ctdb->daemon.name, 0700) != 0) {
996 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
997 goto failed;
1001 if (listen(ctdb->daemon.sd, 100) != 0) {
1002 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1003 goto failed;
1006 return 0;
1008 failed:
1009 close(ctdb->daemon.sd);
1010 ctdb->daemon.sd = -1;
1011 return -1;
1014 static void initialise_node_flags (struct ctdb_context *ctdb)
1016 if (ctdb->pnn == -1) {
1017 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1020 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1022 /* do we start out in DISABLED mode? */
1023 if (ctdb->start_as_disabled != 0) {
1024 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1025 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1027 /* do we start out in STOPPED mode? */
1028 if (ctdb->start_as_stopped != 0) {
1029 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1030 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1034 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1035 void *private_data)
1037 if (status != 0) {
1038 ctdb_die(ctdb, "Failed to run setup event");
1040 ctdb_run_notification_script(ctdb, "setup");
1042 /* tell all other nodes we've just started up */
1043 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1044 0, CTDB_CONTROL_STARTUP, 0,
1045 CTDB_CTRL_FLAG_NOREPLY,
1046 tdb_null, NULL, NULL);
1048 /* Start the recovery daemon */
1049 if (ctdb_start_recoverd(ctdb) != 0) {
1050 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1051 exit(11);
1054 ctdb_start_periodic_events(ctdb);
1056 ctdb_wait_for_first_recovery(ctdb);
1059 static struct timeval tevent_before_wait_ts;
1060 static struct timeval tevent_after_wait_ts;
1062 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1063 void *private_data)
1065 struct timeval diff;
1066 struct timeval now;
1067 struct ctdb_context *ctdb =
1068 talloc_get_type(private_data, struct ctdb_context);
1070 if (getpid() != ctdb->ctdbd_pid) {
1071 return;
1074 now = timeval_current();
1076 switch (tp) {
1077 case TEVENT_TRACE_BEFORE_WAIT:
1078 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1079 diff = timeval_until(&tevent_after_wait_ts, &now);
1080 if (diff.tv_sec > 3) {
1081 DEBUG(DEBUG_ERR,
1082 ("Handling event took %ld seconds!\n",
1083 diff.tv_sec));
1086 tevent_before_wait_ts = now;
1087 break;
1089 case TEVENT_TRACE_AFTER_WAIT:
1090 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1091 diff = timeval_until(&tevent_before_wait_ts, &now);
1092 if (diff.tv_sec > 3) {
1093 DEBUG(DEBUG_CRIT,
1094 ("No event for %ld seconds!\n",
1095 diff.tv_sec));
1098 tevent_after_wait_ts = now;
1099 break;
1101 default:
1102 /* Do nothing for future tevent trace points */ ;
1106 static void ctdb_remove_pidfile(void)
1108 /* Only the main ctdbd's PID matches the SID */
1109 if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1110 if (unlink(ctdbd_pidfile) == 0) {
1111 DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1112 ctdbd_pidfile));
1113 } else {
1114 DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1115 ctdbd_pidfile));
1120 static void ctdb_create_pidfile(pid_t pid)
1122 if (ctdbd_pidfile != NULL) {
1123 FILE *fp;
1125 fp = fopen(ctdbd_pidfile, "w");
1126 if (fp == NULL) {
1127 DEBUG(DEBUG_ALERT,
1128 ("Failed to open PID file %s\n", ctdbd_pidfile));
1129 exit(11);
1132 fprintf(fp, "%d\n", pid);
1133 fclose(fp);
1134 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1135 atexit(ctdb_remove_pidfile);
1140 start the protocol going as a daemon
1142 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
1144 int res, ret = -1;
1145 struct fd_event *fde;
1146 const char *domain_socket_name;
1148 /* create a unix domain stream socket to listen to */
1149 res = ux_socket_bind(ctdb);
1150 if (res!=0) {
1151 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1152 exit(10);
1155 if (do_fork && fork()) {
1156 return 0;
1159 tdb_reopen_all(false);
1161 if (do_fork) {
1162 if (setsid() == -1) {
1163 ctdb_die(ctdb, "Failed to setsid()\n");
1165 close(0);
1166 if (open("/dev/null", O_RDONLY) != 0) {
1167 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1168 exit(11);
1171 ignore_signal(SIGPIPE);
1173 ctdb->ctdbd_pid = getpid();
1174 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1175 CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1176 ctdb_create_pidfile(ctdb->ctdbd_pid);
1178 /* Make sure we log something when the daemon terminates.
1179 * This must be the first exit handler to run (so the last to
1180 * be registered.
1182 atexit(print_exit_message);
1184 if (ctdb->do_setsched) {
1185 /* try to set us up as realtime */
1186 set_scheduler();
1189 /* ensure the socket is deleted on exit of the daemon */
1190 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1191 if (domain_socket_name == NULL) {
1192 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1193 exit(12);
1196 ctdb->ev = event_context_init(NULL);
1197 tevent_loop_allow_nesting(ctdb->ev);
1198 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1199 ret = ctdb_init_tevent_logging(ctdb);
1200 if (ret != 0) {
1201 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1202 exit(1);
1205 /* set up a handler to pick up sigchld */
1206 if (ctdb_init_sigchld(ctdb) == NULL) {
1207 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1208 exit(1);
1211 ctdb_set_child_logging(ctdb);
1212 if (use_syslog) {
1213 if (start_syslog_daemon(ctdb)) {
1214 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1215 exit(10);
1219 /* initialize statistics collection */
1220 ctdb_statistics_init(ctdb);
1222 /* force initial recovery for election */
1223 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1225 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1226 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1227 if (ret != 0) {
1228 ctdb_die(ctdb, "Failed to run init event\n");
1230 ctdb_run_notification_script(ctdb, "init");
1232 if (strcmp(ctdb->transport, "tcp") == 0) {
1233 ret = ctdb_tcp_init(ctdb);
1235 #ifdef USE_INFINIBAND
1236 if (strcmp(ctdb->transport, "ib") == 0) {
1237 ret = ctdb_ibw_init(ctdb);
1239 #endif
1240 if (ret != 0) {
1241 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1242 return -1;
1245 if (ctdb->methods == NULL) {
1246 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1247 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1250 /* initialise the transport */
1251 if (ctdb->methods->initialise(ctdb) != 0) {
1252 ctdb_fatal(ctdb, "transport failed to initialise");
1255 initialise_node_flags(ctdb);
1257 if (ctdb->public_addresses_file) {
1258 ret = ctdb_set_public_addresses(ctdb, true);
1259 if (ret == -1) {
1260 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1261 exit(1);
1263 if (ctdb->do_checkpublicip) {
1264 ctdb_start_monitoring_interfaces(ctdb);
1269 /* attach to existing databases */
1270 if (ctdb_attach_databases(ctdb) != 0) {
1271 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1274 /* start frozen, then let the first election sort things out */
1275 if (!ctdb_blocking_freeze(ctdb)) {
1276 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1279 /* now start accepting clients, only can do this once frozen */
1280 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
1281 EVENT_FD_READ,
1282 ctdb_accept_client, ctdb);
1283 if (fde == NULL) {
1284 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1286 tevent_fd_set_auto_close(fde);
1288 /* release any IPs we hold from previous runs of the daemon */
1289 if (ctdb->tunable.disable_ip_failover == 0) {
1290 ctdb_release_all_ips(ctdb);
1293 /* Start the transport */
1294 if (ctdb->methods->start(ctdb) != 0) {
1295 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1296 ctdb_fatal(ctdb, "transport failed to start");
1299 /* Recovery daemon and timed events are started from the
1300 * callback, only after the setup event completes
1301 * successfully.
1303 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1304 ret = ctdb_event_script_callback(ctdb,
1305 ctdb,
1306 ctdb_setup_event_callback,
1307 ctdb,
1308 CTDB_EVENT_SETUP,
1309 "%s",
1310 "");
1311 if (ret != 0) {
1312 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1313 exit(1);
1316 lockdown_memory(ctdb->valgrinding);
1318 /* go into a wait loop to allow other nodes to complete */
1319 event_loop_wait(ctdb->ev);
1321 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1322 exit(1);
1326 allocate a packet for use in daemon<->daemon communication
1328 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1329 TALLOC_CTX *mem_ctx,
1330 enum ctdb_operation operation,
1331 size_t length, size_t slength,
1332 const char *type)
1334 int size;
1335 struct ctdb_req_header *hdr;
1337 length = MAX(length, slength);
1338 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1340 if (ctdb->methods == NULL) {
1341 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1342 operation, (unsigned)length));
1343 return NULL;
1346 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1347 if (hdr == NULL) {
1348 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1349 operation, (unsigned)length));
1350 return NULL;
1352 talloc_set_name_const(hdr, type);
1353 memset(hdr, 0, slength);
1354 hdr->length = length;
1355 hdr->operation = operation;
1356 hdr->ctdb_magic = CTDB_MAGIC;
1357 hdr->ctdb_version = CTDB_VERSION;
1358 hdr->generation = ctdb->vnn_map->generation;
1359 hdr->srcnode = ctdb->pnn;
1361 return hdr;
1364 struct daemon_control_state {
1365 struct daemon_control_state *next, *prev;
1366 struct ctdb_client *client;
1367 struct ctdb_req_control *c;
1368 uint32_t reqid;
1369 struct ctdb_node *node;
1373 callback when a control reply comes in
1375 static void daemon_control_callback(struct ctdb_context *ctdb,
1376 int32_t status, TDB_DATA data,
1377 const char *errormsg,
1378 void *private_data)
1380 struct daemon_control_state *state = talloc_get_type(private_data,
1381 struct daemon_control_state);
1382 struct ctdb_client *client = state->client;
1383 struct ctdb_reply_control *r;
1384 size_t len;
1385 int ret;
1387 /* construct a message to send to the client containing the data */
1388 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1389 if (errormsg) {
1390 len += strlen(errormsg);
1392 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1393 struct ctdb_reply_control);
1394 CTDB_NO_MEMORY_VOID(ctdb, r);
1396 r->hdr.reqid = state->reqid;
1397 r->status = status;
1398 r->datalen = data.dsize;
1399 r->errorlen = 0;
1400 memcpy(&r->data[0], data.dptr, data.dsize);
1401 if (errormsg) {
1402 r->errorlen = strlen(errormsg);
1403 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1406 ret = daemon_queue_send(client, &r->hdr);
1407 if (ret != -1) {
1408 talloc_free(state);
1413 fail all pending controls to a disconnected node
1415 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1417 struct daemon_control_state *state;
1418 while ((state = node->pending_controls)) {
1419 DLIST_REMOVE(node->pending_controls, state);
1420 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1421 "node is disconnected", state);
1426 destroy a daemon_control_state
1428 static int daemon_control_destructor(struct daemon_control_state *state)
1430 if (state->node) {
1431 DLIST_REMOVE(state->node->pending_controls, state);
1433 return 0;
1437 this is called when the ctdb daemon received a ctdb request control
1438 from a local client over the unix domain socket
1440 static void daemon_request_control_from_client(struct ctdb_client *client,
1441 struct ctdb_req_control *c)
1443 TDB_DATA data;
1444 int res;
1445 struct daemon_control_state *state;
1446 TALLOC_CTX *tmp_ctx = talloc_new(client);
1448 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1449 c->hdr.destnode = client->ctdb->pnn;
1452 state = talloc(client, struct daemon_control_state);
1453 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1455 state->client = client;
1456 state->c = talloc_steal(state, c);
1457 state->reqid = c->hdr.reqid;
1458 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1459 state->node = client->ctdb->nodes[c->hdr.destnode];
1460 DLIST_ADD(state->node->pending_controls, state);
1461 } else {
1462 state->node = NULL;
1465 talloc_set_destructor(state, daemon_control_destructor);
1467 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1468 talloc_steal(tmp_ctx, state);
1471 data.dptr = &c->data[0];
1472 data.dsize = c->datalen;
1473 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1474 c->srvid, c->opcode, client->client_id,
1475 c->flags,
1476 data, daemon_control_callback,
1477 state);
1478 if (res != 0) {
1479 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1480 c->hdr.destnode));
1483 talloc_free(tmp_ctx);
1487 register a call function
1489 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1490 ctdb_fn_t fn, int id)
1492 struct ctdb_registered_call *call;
1493 struct ctdb_db_context *ctdb_db;
1495 ctdb_db = find_ctdb_db(ctdb, db_id);
1496 if (ctdb_db == NULL) {
1497 return -1;
1500 call = talloc(ctdb_db, struct ctdb_registered_call);
1501 call->fn = fn;
1502 call->id = id;
1504 DLIST_ADD(ctdb_db->calls, call);
1505 return 0;
1511 this local messaging handler is ugly, but is needed to prevent
1512 recursion in ctdb_send_message() when the destination node is the
1513 same as the source node
1515 struct ctdb_local_message {
1516 struct ctdb_context *ctdb;
1517 uint64_t srvid;
1518 TDB_DATA data;
1521 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1522 struct timeval t, void *private_data)
1524 struct ctdb_local_message *m = talloc_get_type(private_data,
1525 struct ctdb_local_message);
1526 int res;
1528 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1529 if (res != 0) {
1530 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1531 (unsigned long long)m->srvid));
1533 talloc_free(m);
1536 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1538 struct ctdb_local_message *m;
1539 m = talloc(ctdb, struct ctdb_local_message);
1540 CTDB_NO_MEMORY(ctdb, m);
1542 m->ctdb = ctdb;
1543 m->srvid = srvid;
1544 m->data = data;
1545 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1546 if (m->data.dptr == NULL) {
1547 talloc_free(m);
1548 return -1;
1551 /* this needs to be done as an event to prevent recursion */
1552 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1553 return 0;
1557 send a ctdb message
1559 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1560 uint64_t srvid, TDB_DATA data)
1562 struct ctdb_req_message *r;
1563 int len;
1565 if (ctdb->methods == NULL) {
1566 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1567 return -1;
1570 /* see if this is a message to ourselves */
1571 if (pnn == ctdb->pnn) {
1572 return ctdb_local_message(ctdb, srvid, data);
1575 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1576 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1577 struct ctdb_req_message);
1578 CTDB_NO_MEMORY(ctdb, r);
1580 r->hdr.destnode = pnn;
1581 r->srvid = srvid;
1582 r->datalen = data.dsize;
1583 memcpy(&r->data[0], data.dptr, data.dsize);
1585 ctdb_queue_packet(ctdb, &r->hdr);
1587 talloc_free(r);
1588 return 0;
1593 struct ctdb_client_notify_list {
1594 struct ctdb_client_notify_list *next, *prev;
1595 struct ctdb_context *ctdb;
1596 uint64_t srvid;
1597 TDB_DATA data;
1601 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1603 int ret;
1605 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1607 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1608 if (ret != 0) {
1609 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1612 return 0;
1615 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1617 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1618 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1619 struct ctdb_client_notify_list *nl;
1621 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1623 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1624 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1625 return -1;
1628 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1629 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1630 return -1;
1634 if (client == NULL) {
1635 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1636 return -1;
1639 for(nl=client->notify; nl; nl=nl->next) {
1640 if (nl->srvid == notify->srvid) {
1641 break;
1644 if (nl != NULL) {
1645 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1646 return -1;
1649 nl = talloc(client, struct ctdb_client_notify_list);
1650 CTDB_NO_MEMORY(ctdb, nl);
1651 nl->ctdb = ctdb;
1652 nl->srvid = notify->srvid;
1653 nl->data.dsize = notify->len;
1654 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1655 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1656 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1658 DLIST_ADD(client->notify, nl);
1659 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1661 return 0;
1664 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1666 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1667 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1668 struct ctdb_client_notify_list *nl;
1670 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1672 if (client == NULL) {
1673 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1674 return -1;
1677 for(nl=client->notify; nl; nl=nl->next) {
1678 if (nl->srvid == notify->srvid) {
1679 break;
1682 if (nl == NULL) {
1683 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1684 return -1;
1687 DLIST_REMOVE(client->notify, nl);
1688 talloc_set_destructor(nl, NULL);
1689 talloc_free(nl);
1691 return 0;
1694 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1696 struct ctdb_client_pid_list *client_pid;
1698 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1699 if (client_pid->pid == pid) {
1700 return client_pid->client;
1703 return NULL;
1707 /* This control is used by samba when probing if a process (of a samba daemon)
1708 exists on the node.
1709 Samba does this when it needs/wants to check if a subrecord in one of the
1710 databases is still valied, or if it is stale and can be removed.
1711 If the node is in unhealthy or stopped state we just kill of the samba
1712 process holding htis sub-record and return to the calling samba that
1713 the process does not exist.
1714 This allows us to forcefully recall subrecords registered by samba processes
1715 on banned and stopped nodes.
1717 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1719 struct ctdb_client *client;
1721 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1722 client = ctdb_find_client_by_pid(ctdb, pid);
1723 if (client != NULL) {
1724 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1725 talloc_free(client);
1727 return -1;
1730 return kill(pid, 0);
1733 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1735 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1736 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1737 return;
1740 DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1741 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1742 ctdb_stop_recoverd(ctdb);
1743 ctdb_stop_keepalive(ctdb);
1744 ctdb_stop_monitoring(ctdb);
1745 ctdb_release_all_ips(ctdb);
1746 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1747 if (ctdb->methods != NULL) {
1748 ctdb->methods->shutdown(ctdb);
1751 DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1752 exit(exit_code);