ctdbd: Sleep at exit to allow time for log messages to flush
[Samba.git] / ctdb / server / ctdb_daemon.c
blob644b5edf6205b6029095b5db89275d43cc952061
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
33 struct ctdb_client_pid_list {
34 struct ctdb_client_pid_list *next, *prev;
35 struct ctdb_context *ctdb;
36 pid_t pid;
37 struct ctdb_client *client;
40 const char *ctdbd_pidfile = NULL;
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
44 static void print_exit_message(void)
46 if (debug_extra != NULL && debug_extra[0] != '\0') {
47 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48 } else {
49 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
51 /* Wait a second to allow pending log messages to be flushed */
52 sleep(1);
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
59 struct timeval t, void *private_data)
61 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
63 if (getpid() != ctdbd_pid) {
64 return;
67 event_add_timed(ctdb->ev, ctdb,
68 timeval_current_ofs(1, 0),
69 ctdb_time_tick, ctdb);
72 /* Used to trigger a dummy event once per second, to make
73 * detection of hangs more reliable.
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
77 event_add_timed(ctdb->ev, ctdb,
78 timeval_current_ofs(1, 0),
79 ctdb_time_tick, ctdb);
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
84 /* start monitoring for connected/disconnected nodes */
85 ctdb_start_keepalive(ctdb);
87 /* start monitoring for node health */
88 ctdb_start_monitoring(ctdb);
90 /* start periodic update of tcp tickle lists */
91 ctdb_start_tcp_tickle_update(ctdb);
93 /* start listening for recovery daemon pings */
94 ctdb_control_recd_ping(ctdb);
96 /* start listening to timer ticks */
97 ctdb_start_time_tickd(ctdb);
100 static void block_signal(int signum)
102 struct sigaction act;
104 memset(&act, 0, sizeof(act));
106 act.sa_handler = SIG_IGN;
107 sigemptyset(&act.sa_mask);
108 sigaddset(&act.sa_mask, signum);
109 sigaction(signum, &act, NULL);
114 send a packet to a client
116 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
118 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
119 if (hdr->operation == CTDB_REQ_MESSAGE) {
120 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
121 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
122 talloc_free(client);
123 return -1;
126 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
130 message handler for when we are in daemon mode. This redirects the message
131 to the right client
133 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
134 TDB_DATA data, void *private_data)
136 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
137 struct ctdb_req_message *r;
138 int len;
140 /* construct a message to send to the client containing the data */
141 len = offsetof(struct ctdb_req_message, data) + data.dsize;
142 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
143 len, struct ctdb_req_message);
144 CTDB_NO_MEMORY_VOID(ctdb, r);
146 talloc_set_name_const(r, "req_message packet");
148 r->srvid = srvid;
149 r->datalen = data.dsize;
150 memcpy(&r->data[0], data.dptr, data.dsize);
152 daemon_queue_send(client, &r->hdr);
154 talloc_free(r);
158 this is called when the ctdb daemon received a ctdb request to
159 set the srvid from the client
161 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
163 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
164 int res;
165 if (client == NULL) {
166 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
167 return -1;
169 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
170 if (res != 0) {
171 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
172 (unsigned long long)srvid));
173 } else {
174 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
175 (unsigned long long)srvid));
178 return res;
182 this is called when the ctdb daemon received a ctdb request to
183 remove a srvid from the client
185 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
187 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
188 if (client == NULL) {
189 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
190 return -1;
192 return ctdb_deregister_message_handler(ctdb, srvid, client);
195 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
196 TDB_DATA *outdata)
198 uint64_t *ids;
199 int i, num_ids;
200 uint8_t *results;
202 if ((indata.dsize % sizeof(uint64_t)) != 0) {
203 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
204 "size=%d\n", (int)indata.dsize));
205 return -1;
208 ids = (uint64_t *)indata.dptr;
209 num_ids = indata.dsize / 8;
211 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
212 if (results == NULL) {
213 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
214 return -1;
216 for (i=0; i<num_ids; i++) {
217 if (ctdb_check_message_handler(ctdb, ids[i])) {
218 results[i/8] |= (1 << (i%8));
221 outdata->dptr = (uint8_t *)results;
222 outdata->dsize = talloc_get_size(results);
223 return 0;
227 destroy a ctdb_client
229 static int ctdb_client_destructor(struct ctdb_client *client)
231 struct ctdb_db_context *ctdb_db;
233 ctdb_takeover_client_destructor_hook(client);
234 ctdb_reqid_remove(client->ctdb, client->client_id);
235 client->ctdb->num_clients--;
237 if (client->num_persistent_updates != 0) {
238 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
239 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
241 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
242 if (ctdb_db) {
243 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
244 "commit active. Forcing recovery.\n"));
245 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
247 /* legacy trans2 transaction state: */
248 ctdb_db->transaction_active = false;
251 * trans3 transaction state:
253 * The destructor sets the pointer to NULL.
255 talloc_free(ctdb_db->persistent_state);
258 return 0;
263 this is called when the ctdb daemon received a ctdb request message
264 from a local client over the unix domain socket
266 static void daemon_request_message_from_client(struct ctdb_client *client,
267 struct ctdb_req_message *c)
269 TDB_DATA data;
270 int res;
272 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
273 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
276 /* maybe the message is for another client on this node */
277 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
278 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
279 return;
282 /* its for a remote node */
283 data.dptr = &c->data[0];
284 data.dsize = c->datalen;
285 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
286 c->srvid, data);
287 if (res != 0) {
288 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
289 c->hdr.destnode));
294 struct daemon_call_state {
295 struct ctdb_client *client;
296 uint32_t reqid;
297 struct ctdb_call *call;
298 struct timeval start_time;
300 /* readonly request ? */
301 uint32_t readonly_fetch;
302 uint32_t client_callid;
306 complete a call from a client
308 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
310 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
311 struct daemon_call_state);
312 struct ctdb_reply_call *r;
313 int res;
314 uint32_t length;
315 struct ctdb_client *client = dstate->client;
316 struct ctdb_db_context *ctdb_db = state->ctdb_db;
318 talloc_steal(client, dstate);
319 talloc_steal(dstate, dstate->call);
321 res = ctdb_daemon_call_recv(state, dstate->call);
322 if (res != 0) {
323 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
324 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
326 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
327 return;
330 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
331 /* If the client asked for readonly FETCH, we remapped this to
332 FETCH_WITH_HEADER when calling the daemon. So we must
333 strip the extra header off the reply data before passing
334 it back to the client.
336 if (dstate->readonly_fetch
337 && dstate->client_callid == CTDB_FETCH_FUNC) {
338 length -= sizeof(struct ctdb_ltdb_header);
341 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
342 length, struct ctdb_reply_call);
343 if (r == NULL) {
344 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
345 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
346 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
347 return;
349 r->hdr.reqid = dstate->reqid;
350 r->status = dstate->call->status;
352 if (dstate->readonly_fetch
353 && dstate->client_callid == CTDB_FETCH_FUNC) {
354 /* client only asked for a FETCH so we must strip off
355 the extra ctdb_ltdb header
357 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
358 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
359 } else {
360 r->datalen = dstate->call->reply_data.dsize;
361 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
364 res = daemon_queue_send(client, &r->hdr);
365 if (res == -1) {
366 /* client is dead - return immediately */
367 return;
369 if (res != 0) {
370 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
372 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
373 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
374 talloc_free(dstate);
377 struct ctdb_daemon_packet_wrap {
378 struct ctdb_context *ctdb;
379 uint32_t client_id;
383 a wrapper to catch disconnected clients
385 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
387 struct ctdb_client *client;
388 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
389 struct ctdb_daemon_packet_wrap);
390 if (w == NULL) {
391 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
392 return;
395 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
396 if (client == NULL) {
397 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
398 w->client_id));
399 talloc_free(w);
400 return;
402 talloc_free(w);
404 /* process it */
405 daemon_incoming_packet(client, hdr);
408 struct ctdb_deferred_fetch_call {
409 struct ctdb_deferred_fetch_call *next, *prev;
410 struct ctdb_req_call *c;
411 struct ctdb_daemon_packet_wrap *w;
414 struct ctdb_deferred_fetch_queue {
415 struct ctdb_deferred_fetch_call *deferred_calls;
418 struct ctdb_deferred_requeue {
419 struct ctdb_deferred_fetch_call *dfc;
420 struct ctdb_client *client;
423 /* called from a timer event and starts reprocessing the deferred call.*/
424 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
425 struct timeval t, void *private_data)
427 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
428 struct ctdb_client *client = dfr->client;
430 talloc_steal(client, dfr->dfc->c);
431 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
432 talloc_free(dfr);
435 /* the referral context is destroyed either after a timeout or when the initial
436 fetch-lock has finished.
437 at this stage, immediately start reprocessing the queued up deferred
438 calls so they get reprocessed immediately (and since we are dmaster at
439 this stage, trigger the waiting smbd processes to pick up and aquire the
440 record right away.
442 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
445 /* need to reprocess the packets from the queue explicitely instead of
446 just using a normal destructor since we want, need, to
447 call the clients in the same oder as the requests queued up
449 while (dfq->deferred_calls != NULL) {
450 struct ctdb_client *client;
451 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
452 struct ctdb_deferred_requeue *dfr;
454 DLIST_REMOVE(dfq->deferred_calls, dfc);
456 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
457 if (client == NULL) {
458 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
459 dfc->w->client_id));
460 continue;
463 /* process it by pushing it back onto the eventloop */
464 dfr = talloc(client, struct ctdb_deferred_requeue);
465 if (dfr == NULL) {
466 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
467 continue;
470 dfr->dfc = talloc_steal(dfr, dfc);
471 dfr->client = client;
473 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
476 return 0;
479 /* insert the new deferral context into the rb tree.
480 there should never be a pre-existing context here, but check for it
481 warn and destroy the previous context if there is already a deferral context
482 for this key.
484 static void *insert_dfq_callback(void *parm, void *data)
486 if (data) {
487 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
488 talloc_free(data);
490 return parm;
493 /* if the original fetch-lock did not complete within a reasonable time,
494 free the context and context for all deferred requests to cause them to be
495 re-inserted into the event system.
497 static void dfq_timeout(struct event_context *ev, struct timed_event *te,
498 struct timeval t, void *private_data)
500 talloc_free(private_data);
503 /* This function is used in the local daemon to register a KEY in a database
504 for being "fetched"
505 While the remote fetch is in-flight, any futher attempts to re-fetch the
506 same record will be deferred until the fetch completes.
508 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
510 uint32_t *k;
511 struct ctdb_deferred_fetch_queue *dfq;
513 k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
514 if (k == NULL) {
515 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
516 return -1;
519 k[0] = (call->key.dsize + 3) / 4 + 1;
520 memcpy(&k[1], call->key.dptr, call->key.dsize);
522 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
523 if (dfq == NULL) {
524 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
525 talloc_free(k);
526 return -1;
528 dfq->deferred_calls = NULL;
530 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
532 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
534 /* if the fetch havent completed in 30 seconds, just tear it all down
535 and let it try again as the events are reissued */
536 event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
538 talloc_free(k);
539 return 0;
542 /* check if this is a duplicate request to a fetch already in-flight
543 if it is, make this call deferred to be reprocessed later when
544 the in-flight fetch completes.
546 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
548 uint32_t *k;
549 struct ctdb_deferred_fetch_queue *dfq;
550 struct ctdb_deferred_fetch_call *dfc;
552 k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
553 if (k == NULL) {
554 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
555 return -1;
558 k[0] = (key.dsize + 3) / 4 + 1;
559 memcpy(&k[1], key.dptr, key.dsize);
561 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
562 if (dfq == NULL) {
563 talloc_free(k);
564 return -1;
568 talloc_free(k);
570 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
571 if (dfc == NULL) {
572 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
573 return -1;
576 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
577 if (dfc->w == NULL) {
578 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
579 talloc_free(dfc);
580 return -1;
583 dfc->c = talloc_steal(dfc, c);
584 dfc->w->ctdb = ctdb_db->ctdb;
585 dfc->w->client_id = client->client_id;
587 DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
589 return 0;
594 this is called when the ctdb daemon received a ctdb request call
595 from a local client over the unix domain socket
597 static void daemon_request_call_from_client(struct ctdb_client *client,
598 struct ctdb_req_call *c)
600 struct ctdb_call_state *state;
601 struct ctdb_db_context *ctdb_db;
602 struct daemon_call_state *dstate;
603 struct ctdb_call *call;
604 struct ctdb_ltdb_header header;
605 TDB_DATA key, data;
606 int ret;
607 struct ctdb_context *ctdb = client->ctdb;
608 struct ctdb_daemon_packet_wrap *w;
610 CTDB_INCREMENT_STAT(ctdb, total_calls);
611 CTDB_DECREMENT_STAT(ctdb, pending_calls);
613 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
614 if (!ctdb_db) {
615 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
616 c->db_id));
617 CTDB_DECREMENT_STAT(ctdb, pending_calls);
618 return;
621 if (ctdb_db->unhealthy_reason) {
623 * this is just a warning, as the tdb should be empty anyway,
624 * and only persistent databases can be unhealthy, which doesn't
625 * use this code patch
627 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
628 ctdb_db->db_name, ctdb_db->unhealthy_reason));
631 key.dptr = c->data;
632 key.dsize = c->keylen;
634 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
635 CTDB_NO_MEMORY_VOID(ctdb, w);
637 w->ctdb = ctdb;
638 w->client_id = client->client_id;
640 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
641 (struct ctdb_req_header *)c, &data,
642 daemon_incoming_packet_wrap, w, true);
643 if (ret == -2) {
644 /* will retry later */
645 CTDB_DECREMENT_STAT(ctdb, pending_calls);
646 return;
649 talloc_free(w);
651 if (ret != 0) {
652 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
653 CTDB_DECREMENT_STAT(ctdb, pending_calls);
654 return;
658 /* check if this fetch request is a duplicate for a
659 request we already have in flight. If so defer it until
660 the first request completes.
662 if (ctdb->tunable.fetch_collapse == 1) {
663 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
664 ret = ctdb_ltdb_unlock(ctdb_db, key);
665 if (ret != 0) {
666 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
668 return;
672 /* Dont do READONLY if we dont have a tracking database */
673 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
674 c->flags &= ~CTDB_WANT_READONLY;
677 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
678 header.flags &= ~CTDB_REC_RO_FLAGS;
679 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
680 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
681 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
682 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
684 /* and clear out the tracking data */
685 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
686 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
690 /* if we are revoking, we must defer all other calls until the revoke
691 * had completed.
693 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
694 talloc_free(data.dptr);
695 ret = ctdb_ltdb_unlock(ctdb_db, key);
697 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
698 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
700 return;
703 if ((header.dmaster == ctdb->pnn)
704 && (!(c->flags & CTDB_WANT_READONLY))
705 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
706 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
707 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
708 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
710 ret = ctdb_ltdb_unlock(ctdb_db, key);
712 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
713 ctdb_fatal(ctdb, "Failed to start record revoke");
715 talloc_free(data.dptr);
717 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
718 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
721 return;
724 dstate = talloc(client, struct daemon_call_state);
725 if (dstate == NULL) {
726 ret = ctdb_ltdb_unlock(ctdb_db, key);
727 if (ret != 0) {
728 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
731 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
732 CTDB_DECREMENT_STAT(ctdb, pending_calls);
733 return;
735 dstate->start_time = timeval_current();
736 dstate->client = client;
737 dstate->reqid = c->hdr.reqid;
738 talloc_steal(dstate, data.dptr);
740 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
741 if (call == NULL) {
742 ret = ctdb_ltdb_unlock(ctdb_db, key);
743 if (ret != 0) {
744 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
747 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
748 CTDB_DECREMENT_STAT(ctdb, pending_calls);
749 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
750 return;
753 dstate->readonly_fetch = 0;
754 call->call_id = c->callid;
755 call->key = key;
756 call->call_data.dptr = c->data + c->keylen;
757 call->call_data.dsize = c->calldatalen;
758 call->flags = c->flags;
760 if (c->flags & CTDB_WANT_READONLY) {
761 /* client wants readonly record, so translate this into a
762 fetch with header. remember what the client asked for
763 so we can remap the reply back to the proper format for
764 the client in the reply
766 dstate->client_callid = call->call_id;
767 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
768 dstate->readonly_fetch = 1;
771 if (header.dmaster == ctdb->pnn) {
772 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
773 } else {
774 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
775 if (ctdb->tunable.fetch_collapse == 1) {
776 /* This request triggered a remote fetch-lock.
777 set up a deferral for this key so any additional
778 fetch-locks are deferred until the current one
779 finishes.
781 setup_deferred_fetch_locks(ctdb_db, call);
785 ret = ctdb_ltdb_unlock(ctdb_db, key);
786 if (ret != 0) {
787 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
790 if (state == NULL) {
791 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
792 CTDB_DECREMENT_STAT(ctdb, pending_calls);
793 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
794 return;
796 talloc_steal(state, dstate);
797 talloc_steal(client, state);
799 state->async.fn = daemon_call_from_client_callback;
800 state->async.private_data = dstate;
804 static void daemon_request_control_from_client(struct ctdb_client *client,
805 struct ctdb_req_control *c);
807 /* data contains a packet from the client */
808 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
810 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
811 TALLOC_CTX *tmp_ctx;
812 struct ctdb_context *ctdb = client->ctdb;
814 /* place the packet as a child of a tmp_ctx. We then use
815 talloc_free() below to free it. If any of the calls want
816 to keep it, then they will steal it somewhere else, and the
817 talloc_free() will be a no-op */
818 tmp_ctx = talloc_new(client);
819 talloc_steal(tmp_ctx, hdr);
821 if (hdr->ctdb_magic != CTDB_MAGIC) {
822 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
823 goto done;
826 if (hdr->ctdb_version != CTDB_VERSION) {
827 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
828 goto done;
831 switch (hdr->operation) {
832 case CTDB_REQ_CALL:
833 CTDB_INCREMENT_STAT(ctdb, client.req_call);
834 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
835 break;
837 case CTDB_REQ_MESSAGE:
838 CTDB_INCREMENT_STAT(ctdb, client.req_message);
839 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
840 break;
842 case CTDB_REQ_CONTROL:
843 CTDB_INCREMENT_STAT(ctdb, client.req_control);
844 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
845 break;
847 default:
848 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
849 hdr->operation));
852 done:
853 talloc_free(tmp_ctx);
857 called when the daemon gets a incoming packet
859 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
861 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
862 struct ctdb_req_header *hdr;
864 if (cnt == 0) {
865 talloc_free(client);
866 return;
869 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
871 if (cnt < sizeof(*hdr)) {
872 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
873 (unsigned)cnt);
874 return;
876 hdr = (struct ctdb_req_header *)data;
877 if (cnt != hdr->length) {
878 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
879 (unsigned)hdr->length, (unsigned)cnt);
880 return;
883 if (hdr->ctdb_magic != CTDB_MAGIC) {
884 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
885 return;
888 if (hdr->ctdb_version != CTDB_VERSION) {
889 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
890 return;
893 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
894 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
895 hdr->srcnode, hdr->destnode));
897 /* it is the responsibility of the incoming packet function to free 'data' */
898 daemon_incoming_packet(client, hdr);
902 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
904 if (client_pid->ctdb->client_pids != NULL) {
905 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
908 return 0;
912 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
913 uint16_t flags, void *private_data)
915 struct sockaddr_un addr;
916 socklen_t len;
917 int fd;
918 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
919 struct ctdb_client *client;
920 struct ctdb_client_pid_list *client_pid;
921 pid_t peer_pid = 0;
923 memset(&addr, 0, sizeof(addr));
924 len = sizeof(addr);
925 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
926 if (fd == -1) {
927 return;
930 set_nonblocking(fd);
931 set_close_on_exec(fd);
933 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
935 client = talloc_zero(ctdb, struct ctdb_client);
936 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
937 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
940 client->ctdb = ctdb;
941 client->fd = fd;
942 client->client_id = ctdb_reqid_new(ctdb, client);
943 client->pid = peer_pid;
945 client_pid = talloc(client, struct ctdb_client_pid_list);
946 if (client_pid == NULL) {
947 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
948 close(fd);
949 talloc_free(client);
950 return;
952 client_pid->ctdb = ctdb;
953 client_pid->pid = peer_pid;
954 client_pid->client = client;
956 DLIST_ADD(ctdb->client_pids, client_pid);
958 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
959 ctdb_daemon_read_cb, client,
960 "client-%u", client->pid);
962 talloc_set_destructor(client, ctdb_client_destructor);
963 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
964 ctdb->num_clients++;
970 create a unix domain socket and bind it
971 return a file descriptor open on the socket
973 static int ux_socket_bind(struct ctdb_context *ctdb)
975 struct sockaddr_un addr;
977 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
978 if (ctdb->daemon.sd == -1) {
979 return -1;
982 memset(&addr, 0, sizeof(addr));
983 addr.sun_family = AF_UNIX;
984 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
986 /* First check if an old ctdbd might be running */
987 if (connect(ctdb->daemon.sd,
988 (struct sockaddr *)&addr, sizeof(addr)) == 0) {
989 DEBUG(DEBUG_CRIT,
990 ("Something is already listening on ctdb socket '%s'\n",
991 ctdb->daemon.name));
992 goto failed;
995 /* Remove any old socket */
996 unlink(ctdb->daemon.name);
998 set_close_on_exec(ctdb->daemon.sd);
999 set_nonblocking(ctdb->daemon.sd);
1001 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1002 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1003 goto failed;
1006 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1007 chmod(ctdb->daemon.name, 0700) != 0) {
1008 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1009 goto failed;
1013 if (listen(ctdb->daemon.sd, 100) != 0) {
1014 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1015 goto failed;
1018 return 0;
1020 failed:
1021 close(ctdb->daemon.sd);
1022 ctdb->daemon.sd = -1;
1023 return -1;
1026 static void initialise_node_flags (struct ctdb_context *ctdb)
1028 if (ctdb->pnn == -1) {
1029 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1032 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1034 /* do we start out in DISABLED mode? */
1035 if (ctdb->start_as_disabled != 0) {
1036 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1037 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1039 /* do we start out in STOPPED mode? */
1040 if (ctdb->start_as_stopped != 0) {
1041 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1042 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1046 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1047 void *private_data)
1049 if (status != 0) {
1050 ctdb_die(ctdb, "Failed to run setup event");
1052 ctdb_run_notification_script(ctdb, "setup");
1054 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_FIRST_RECOVERY);
1056 /* tell all other nodes we've just started up */
1057 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1058 0, CTDB_CONTROL_STARTUP, 0,
1059 CTDB_CTRL_FLAG_NOREPLY,
1060 tdb_null, NULL, NULL);
1062 /* Start the recovery daemon */
1063 if (ctdb_start_recoverd(ctdb) != 0) {
1064 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1065 exit(11);
1068 ctdb_start_periodic_events(ctdb);
1071 static struct timeval tevent_before_wait_ts;
1072 static struct timeval tevent_after_wait_ts;
1074 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1075 void *private_data)
1077 struct timeval diff;
1078 struct timeval now;
1080 if (getpid() != ctdbd_pid) {
1081 return;
1084 now = timeval_current();
1086 switch (tp) {
1087 case TEVENT_TRACE_BEFORE_WAIT:
1088 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1089 diff = timeval_until(&tevent_after_wait_ts, &now);
1090 if (diff.tv_sec > 3) {
1091 DEBUG(DEBUG_ERR,
1092 ("Handling event took %ld seconds!\n",
1093 diff.tv_sec));
1096 tevent_before_wait_ts = now;
1097 break;
1099 case TEVENT_TRACE_AFTER_WAIT:
1100 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1101 diff = timeval_until(&tevent_before_wait_ts, &now);
1102 if (diff.tv_sec > 3) {
1103 DEBUG(DEBUG_CRIT,
1104 ("No event for %ld seconds!\n",
1105 diff.tv_sec));
1108 tevent_after_wait_ts = now;
1109 break;
1111 default:
1112 /* Do nothing for future tevent trace points */ ;
1116 static void ctdb_remove_pidfile(void)
1118 if (ctdbd_pidfile != NULL && !ctdb_is_child_process()) {
1119 if (unlink(ctdbd_pidfile) == 0) {
1120 DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1121 ctdbd_pidfile));
1122 } else {
1123 DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1124 ctdbd_pidfile));
1129 static void ctdb_create_pidfile(pid_t pid)
1131 if (ctdbd_pidfile != NULL) {
1132 FILE *fp;
1134 fp = fopen(ctdbd_pidfile, "w");
1135 if (fp == NULL) {
1136 DEBUG(DEBUG_ALERT,
1137 ("Failed to open PID file %s\n", ctdbd_pidfile));
1138 exit(11);
1141 fprintf(fp, "%d\n", pid);
1142 fclose(fp);
1143 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1144 atexit(ctdb_remove_pidfile);
1149 start the protocol going as a daemon
1151 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
1153 int res, ret = -1;
1154 struct fd_event *fde;
1155 const char *domain_socket_name;
1157 /* create a unix domain stream socket to listen to */
1158 res = ux_socket_bind(ctdb);
1159 if (res!=0) {
1160 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1161 exit(10);
1164 if (do_fork && fork()) {
1165 return 0;
1168 tdb_reopen_all(false);
1170 if (do_fork) {
1171 setsid();
1172 close(0);
1173 if (open("/dev/null", O_RDONLY) != 0) {
1174 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1175 exit(11);
1178 block_signal(SIGPIPE);
1180 ctdbd_pid = getpid();
1181 ctdb->ctdbd_pid = ctdbd_pid;
1182 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1183 CTDB_VERSION_STRING, ctdbd_pid));
1184 ctdb_create_pidfile(ctdb->ctdbd_pid);
1186 /* Make sure we log something when the daemon terminates.
1187 * This must be the first exit handler to run (so the last to
1188 * be registered.
1190 atexit(print_exit_message);
1192 if (ctdb->do_setsched) {
1193 /* try to set us up as realtime */
1194 ctdb_set_scheduler(ctdb);
1197 /* ensure the socket is deleted on exit of the daemon */
1198 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1199 if (domain_socket_name == NULL) {
1200 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1201 exit(12);
1204 ctdb->ev = event_context_init(NULL);
1205 tevent_loop_allow_nesting(ctdb->ev);
1206 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, NULL);
1207 ret = ctdb_init_tevent_logging(ctdb);
1208 if (ret != 0) {
1209 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1210 exit(1);
1213 /* set up a handler to pick up sigchld */
1214 if (ctdb_init_sigchld(ctdb) == NULL) {
1215 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1216 exit(1);
1219 ctdb_set_child_logging(ctdb);
1220 if (use_syslog) {
1221 if (start_syslog_daemon(ctdb)) {
1222 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1223 exit(10);
1227 /* initialize statistics collection */
1228 ctdb_statistics_init(ctdb);
1230 /* force initial recovery for election */
1231 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1233 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1234 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1235 if (ret != 0) {
1236 ctdb_die(ctdb, "Failed to run init event\n");
1238 ctdb_run_notification_script(ctdb, "init");
1240 if (strcmp(ctdb->transport, "tcp") == 0) {
1241 int ctdb_tcp_init(struct ctdb_context *);
1242 ret = ctdb_tcp_init(ctdb);
1244 #ifdef USE_INFINIBAND
1245 if (strcmp(ctdb->transport, "ib") == 0) {
1246 int ctdb_ibw_init(struct ctdb_context *);
1247 ret = ctdb_ibw_init(ctdb);
1249 #endif
1250 if (ret != 0) {
1251 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1252 return -1;
1255 if (ctdb->methods == NULL) {
1256 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1257 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1260 /* initialise the transport */
1261 if (ctdb->methods->initialise(ctdb) != 0) {
1262 ctdb_fatal(ctdb, "transport failed to initialise");
1265 initialise_node_flags(ctdb);
1267 if (public_address_list) {
1268 ctdb->public_addresses_file = public_address_list;
1269 ret = ctdb_set_public_addresses(ctdb, true);
1270 if (ret == -1) {
1271 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1272 exit(1);
1274 if (ctdb->do_checkpublicip) {
1275 ctdb_start_monitoring_interfaces(ctdb);
1280 /* attach to existing databases */
1281 if (ctdb_attach_databases(ctdb) != 0) {
1282 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1285 /* start frozen, then let the first election sort things out */
1286 if (!ctdb_blocking_freeze(ctdb)) {
1287 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1290 /* now start accepting clients, only can do this once frozen */
1291 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
1292 EVENT_FD_READ,
1293 ctdb_accept_client, ctdb);
1294 if (fde == NULL) {
1295 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1297 tevent_fd_set_auto_close(fde);
1299 /* release any IPs we hold from previous runs of the daemon */
1300 if (ctdb->tunable.disable_ip_failover == 0) {
1301 ctdb_release_all_ips(ctdb);
1304 /* Start the transport */
1305 if (ctdb->methods->start(ctdb) != 0) {
1306 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1307 ctdb_fatal(ctdb, "transport failed to start");
1310 /* Recovery daemon and timed events are started from the
1311 * callback, only after the setup event completes
1312 * successfully.
1314 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1315 ret = ctdb_event_script_callback(ctdb,
1316 ctdb,
1317 ctdb_setup_event_callback,
1318 ctdb,
1319 false,
1320 CTDB_EVENT_SETUP,
1321 "%s",
1322 "");
1323 if (ret != 0) {
1324 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1325 exit(1);
1328 ctdb_lockdown_memory(ctdb);
1330 /* go into a wait loop to allow other nodes to complete */
1331 event_loop_wait(ctdb->ev);
1333 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1334 exit(1);
1338 allocate a packet for use in daemon<->daemon communication
1340 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1341 TALLOC_CTX *mem_ctx,
1342 enum ctdb_operation operation,
1343 size_t length, size_t slength,
1344 const char *type)
1346 int size;
1347 struct ctdb_req_header *hdr;
1349 length = MAX(length, slength);
1350 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1352 if (ctdb->methods == NULL) {
1353 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1354 operation, (unsigned)length));
1355 return NULL;
1358 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1359 if (hdr == NULL) {
1360 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1361 operation, (unsigned)length));
1362 return NULL;
1364 talloc_set_name_const(hdr, type);
1365 memset(hdr, 0, slength);
1366 hdr->length = length;
1367 hdr->operation = operation;
1368 hdr->ctdb_magic = CTDB_MAGIC;
1369 hdr->ctdb_version = CTDB_VERSION;
1370 hdr->generation = ctdb->vnn_map->generation;
1371 hdr->srcnode = ctdb->pnn;
1373 return hdr;
1376 struct daemon_control_state {
1377 struct daemon_control_state *next, *prev;
1378 struct ctdb_client *client;
1379 struct ctdb_req_control *c;
1380 uint32_t reqid;
1381 struct ctdb_node *node;
1385 callback when a control reply comes in
1387 static void daemon_control_callback(struct ctdb_context *ctdb,
1388 int32_t status, TDB_DATA data,
1389 const char *errormsg,
1390 void *private_data)
1392 struct daemon_control_state *state = talloc_get_type(private_data,
1393 struct daemon_control_state);
1394 struct ctdb_client *client = state->client;
1395 struct ctdb_reply_control *r;
1396 size_t len;
1397 int ret;
1399 /* construct a message to send to the client containing the data */
1400 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1401 if (errormsg) {
1402 len += strlen(errormsg);
1404 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1405 struct ctdb_reply_control);
1406 CTDB_NO_MEMORY_VOID(ctdb, r);
1408 r->hdr.reqid = state->reqid;
1409 r->status = status;
1410 r->datalen = data.dsize;
1411 r->errorlen = 0;
1412 memcpy(&r->data[0], data.dptr, data.dsize);
1413 if (errormsg) {
1414 r->errorlen = strlen(errormsg);
1415 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1418 ret = daemon_queue_send(client, &r->hdr);
1419 if (ret != -1) {
1420 talloc_free(state);
1425 fail all pending controls to a disconnected node
1427 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1429 struct daemon_control_state *state;
1430 while ((state = node->pending_controls)) {
1431 DLIST_REMOVE(node->pending_controls, state);
1432 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1433 "node is disconnected", state);
1438 destroy a daemon_control_state
1440 static int daemon_control_destructor(struct daemon_control_state *state)
1442 if (state->node) {
1443 DLIST_REMOVE(state->node->pending_controls, state);
1445 return 0;
1449 this is called when the ctdb daemon received a ctdb request control
1450 from a local client over the unix domain socket
1452 static void daemon_request_control_from_client(struct ctdb_client *client,
1453 struct ctdb_req_control *c)
1455 TDB_DATA data;
1456 int res;
1457 struct daemon_control_state *state;
1458 TALLOC_CTX *tmp_ctx = talloc_new(client);
1460 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1461 c->hdr.destnode = client->ctdb->pnn;
1464 state = talloc(client, struct daemon_control_state);
1465 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1467 state->client = client;
1468 state->c = talloc_steal(state, c);
1469 state->reqid = c->hdr.reqid;
1470 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1471 state->node = client->ctdb->nodes[c->hdr.destnode];
1472 DLIST_ADD(state->node->pending_controls, state);
1473 } else {
1474 state->node = NULL;
1477 talloc_set_destructor(state, daemon_control_destructor);
1479 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1480 talloc_steal(tmp_ctx, state);
1483 data.dptr = &c->data[0];
1484 data.dsize = c->datalen;
1485 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1486 c->srvid, c->opcode, client->client_id,
1487 c->flags,
1488 data, daemon_control_callback,
1489 state);
1490 if (res != 0) {
1491 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1492 c->hdr.destnode));
1495 talloc_free(tmp_ctx);
1499 register a call function
1501 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1502 ctdb_fn_t fn, int id)
1504 struct ctdb_registered_call *call;
1505 struct ctdb_db_context *ctdb_db;
1507 ctdb_db = find_ctdb_db(ctdb, db_id);
1508 if (ctdb_db == NULL) {
1509 return -1;
1512 call = talloc(ctdb_db, struct ctdb_registered_call);
1513 call->fn = fn;
1514 call->id = id;
1516 DLIST_ADD(ctdb_db->calls, call);
1517 return 0;
1523 this local messaging handler is ugly, but is needed to prevent
1524 recursion in ctdb_send_message() when the destination node is the
1525 same as the source node
1527 struct ctdb_local_message {
1528 struct ctdb_context *ctdb;
1529 uint64_t srvid;
1530 TDB_DATA data;
1533 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1534 struct timeval t, void *private_data)
1536 struct ctdb_local_message *m = talloc_get_type(private_data,
1537 struct ctdb_local_message);
1538 int res;
1540 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1541 if (res != 0) {
1542 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1543 (unsigned long long)m->srvid));
1545 talloc_free(m);
1548 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1550 struct ctdb_local_message *m;
1551 m = talloc(ctdb, struct ctdb_local_message);
1552 CTDB_NO_MEMORY(ctdb, m);
1554 m->ctdb = ctdb;
1555 m->srvid = srvid;
1556 m->data = data;
1557 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1558 if (m->data.dptr == NULL) {
1559 talloc_free(m);
1560 return -1;
1563 /* this needs to be done as an event to prevent recursion */
1564 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1565 return 0;
1569 send a ctdb message
1571 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1572 uint64_t srvid, TDB_DATA data)
1574 struct ctdb_req_message *r;
1575 int len;
1577 if (ctdb->methods == NULL) {
1578 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1579 return -1;
1582 /* see if this is a message to ourselves */
1583 if (pnn == ctdb->pnn) {
1584 return ctdb_local_message(ctdb, srvid, data);
1587 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1588 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1589 struct ctdb_req_message);
1590 CTDB_NO_MEMORY(ctdb, r);
1592 r->hdr.destnode = pnn;
1593 r->srvid = srvid;
1594 r->datalen = data.dsize;
1595 memcpy(&r->data[0], data.dptr, data.dsize);
1597 ctdb_queue_packet(ctdb, &r->hdr);
1599 talloc_free(r);
1600 return 0;
1605 struct ctdb_client_notify_list {
1606 struct ctdb_client_notify_list *next, *prev;
1607 struct ctdb_context *ctdb;
1608 uint64_t srvid;
1609 TDB_DATA data;
1613 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1615 int ret;
1617 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1619 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1620 if (ret != 0) {
1621 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1624 return 0;
1627 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1629 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1630 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1631 struct ctdb_client_notify_list *nl;
1633 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1635 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1636 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1637 return -1;
1640 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1641 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1642 return -1;
1646 if (client == NULL) {
1647 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1648 return -1;
1651 for(nl=client->notify; nl; nl=nl->next) {
1652 if (nl->srvid == notify->srvid) {
1653 break;
1656 if (nl != NULL) {
1657 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1658 return -1;
1661 nl = talloc(client, struct ctdb_client_notify_list);
1662 CTDB_NO_MEMORY(ctdb, nl);
1663 nl->ctdb = ctdb;
1664 nl->srvid = notify->srvid;
1665 nl->data.dsize = notify->len;
1666 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1667 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1668 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1670 DLIST_ADD(client->notify, nl);
1671 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1673 return 0;
1676 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1678 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1679 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1680 struct ctdb_client_notify_list *nl;
1682 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1684 if (client == NULL) {
1685 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1686 return -1;
1689 for(nl=client->notify; nl; nl=nl->next) {
1690 if (nl->srvid == notify->srvid) {
1691 break;
1694 if (nl == NULL) {
1695 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1696 return -1;
1699 DLIST_REMOVE(client->notify, nl);
1700 talloc_set_destructor(nl, NULL);
1701 talloc_free(nl);
1703 return 0;
1706 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1708 struct ctdb_client_pid_list *client_pid;
1710 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1711 if (client_pid->pid == pid) {
1712 return client_pid->client;
1715 return NULL;
1719 /* This control is used by samba when probing if a process (of a samba daemon)
1720 exists on the node.
1721 Samba does this when it needs/wants to check if a subrecord in one of the
1722 databases is still valied, or if it is stale and can be removed.
1723 If the node is in unhealthy or stopped state we just kill of the samba
1724 process holding htis sub-record and return to the calling samba that
1725 the process does not exist.
1726 This allows us to forcefully recall subrecords registered by samba processes
1727 on banned and stopped nodes.
1729 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1731 struct ctdb_client *client;
1733 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1734 client = ctdb_find_client_by_pid(ctdb, pid);
1735 if (client != NULL) {
1736 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1737 talloc_free(client);
1739 return -1;
1742 return kill(pid, 0);
1745 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1747 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1748 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1749 return;
1752 DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1753 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1754 ctdb_stop_recoverd(ctdb);
1755 ctdb_stop_keepalive(ctdb);
1756 ctdb_stop_monitoring(ctdb);
1757 ctdb_release_all_ips(ctdb);
1758 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1759 if (ctdb->methods != NULL) {
1760 ctdb->methods->shutdown(ctdb);
1763 DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1764 exit(exit_code);