s3:winbindd fix use of uninitialized variables
[Samba.git] / ctdb / server / ctdb_daemon.c
blob50b2de327ecfb00d8c29257215fbfac3e0e6f799
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
33 struct ctdb_client_pid_list {
34 struct ctdb_client_pid_list *next, *prev;
35 struct ctdb_context *ctdb;
36 pid_t pid;
37 struct ctdb_client *client;
40 const char *ctdbd_pidfile = NULL;
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
44 static void print_exit_message(void)
46 if (debug_extra != NULL && debug_extra[0] != '\0') {
47 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48 } else {
49 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
51 /* Wait a second to allow pending log messages to be flushed */
52 sleep(1);
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
59 struct timeval t, void *private_data)
61 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
63 if (getpid() != ctdbd_pid) {
64 return;
67 event_add_timed(ctdb->ev, ctdb,
68 timeval_current_ofs(1, 0),
69 ctdb_time_tick, ctdb);
72 /* Used to trigger a dummy event once per second, to make
73 * detection of hangs more reliable.
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
77 event_add_timed(ctdb->ev, ctdb,
78 timeval_current_ofs(1, 0),
79 ctdb_time_tick, ctdb);
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
84 /* start monitoring for connected/disconnected nodes */
85 ctdb_start_keepalive(ctdb);
87 /* start monitoring for node health */
88 ctdb_start_monitoring(ctdb);
90 /* start periodic update of tcp tickle lists */
91 ctdb_start_tcp_tickle_update(ctdb);
93 /* start listening for recovery daemon pings */
94 ctdb_control_recd_ping(ctdb);
96 /* start listening to timer ticks */
97 ctdb_start_time_tickd(ctdb);
100 static void block_signal(int signum)
102 struct sigaction act;
104 memset(&act, 0, sizeof(act));
106 act.sa_handler = SIG_IGN;
107 sigemptyset(&act.sa_mask);
108 sigaddset(&act.sa_mask, signum);
109 sigaction(signum, &act, NULL);
114 send a packet to a client
116 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
118 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
119 if (hdr->operation == CTDB_REQ_MESSAGE) {
120 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
121 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
122 talloc_free(client);
123 return -1;
126 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
130 message handler for when we are in daemon mode. This redirects the message
131 to the right client
133 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
134 TDB_DATA data, void *private_data)
136 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
137 struct ctdb_req_message *r;
138 int len;
140 /* construct a message to send to the client containing the data */
141 len = offsetof(struct ctdb_req_message, data) + data.dsize;
142 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
143 len, struct ctdb_req_message);
144 CTDB_NO_MEMORY_VOID(ctdb, r);
146 talloc_set_name_const(r, "req_message packet");
148 r->srvid = srvid;
149 r->datalen = data.dsize;
150 memcpy(&r->data[0], data.dptr, data.dsize);
152 daemon_queue_send(client, &r->hdr);
154 talloc_free(r);
158 this is called when the ctdb daemon received a ctdb request to
159 set the srvid from the client
161 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
163 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
164 int res;
165 if (client == NULL) {
166 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
167 return -1;
169 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
170 if (res != 0) {
171 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
172 (unsigned long long)srvid));
173 } else {
174 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
175 (unsigned long long)srvid));
178 return res;
182 this is called when the ctdb daemon received a ctdb request to
183 remove a srvid from the client
185 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
187 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
188 if (client == NULL) {
189 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
190 return -1;
192 return ctdb_deregister_message_handler(ctdb, srvid, client);
195 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
196 TDB_DATA *outdata)
198 uint64_t *ids;
199 int i, num_ids;
200 uint8_t *results;
202 if ((indata.dsize % sizeof(uint64_t)) != 0) {
203 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
204 "size=%d\n", (int)indata.dsize));
205 return -1;
208 ids = (uint64_t *)indata.dptr;
209 num_ids = indata.dsize / 8;
211 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
212 if (results == NULL) {
213 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
214 return -1;
216 for (i=0; i<num_ids; i++) {
217 if (ctdb_check_message_handler(ctdb, ids[i])) {
218 results[i/8] |= (1 << (i%8));
221 outdata->dptr = (uint8_t *)results;
222 outdata->dsize = talloc_get_size(results);
223 return 0;
227 destroy a ctdb_client
229 static int ctdb_client_destructor(struct ctdb_client *client)
231 struct ctdb_db_context *ctdb_db;
233 ctdb_takeover_client_destructor_hook(client);
234 ctdb_reqid_remove(client->ctdb, client->client_id);
235 client->ctdb->num_clients--;
237 if (client->num_persistent_updates != 0) {
238 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
239 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
241 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
242 if (ctdb_db) {
243 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
244 "commit active. Forcing recovery.\n"));
245 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
248 * trans3 transaction state:
250 * The destructor sets the pointer to NULL.
252 talloc_free(ctdb_db->persistent_state);
255 return 0;
260 this is called when the ctdb daemon received a ctdb request message
261 from a local client over the unix domain socket
263 static void daemon_request_message_from_client(struct ctdb_client *client,
264 struct ctdb_req_message *c)
266 TDB_DATA data;
267 int res;
269 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
270 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
273 /* maybe the message is for another client on this node */
274 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
275 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
276 return;
279 /* its for a remote node */
280 data.dptr = &c->data[0];
281 data.dsize = c->datalen;
282 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
283 c->srvid, data);
284 if (res != 0) {
285 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
286 c->hdr.destnode));
291 struct daemon_call_state {
292 struct ctdb_client *client;
293 uint32_t reqid;
294 struct ctdb_call *call;
295 struct timeval start_time;
297 /* readonly request ? */
298 uint32_t readonly_fetch;
299 uint32_t client_callid;
303 complete a call from a client
305 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
307 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
308 struct daemon_call_state);
309 struct ctdb_reply_call *r;
310 int res;
311 uint32_t length;
312 struct ctdb_client *client = dstate->client;
313 struct ctdb_db_context *ctdb_db = state->ctdb_db;
315 talloc_steal(client, dstate);
316 talloc_steal(dstate, dstate->call);
318 res = ctdb_daemon_call_recv(state, dstate->call);
319 if (res != 0) {
320 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
321 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
323 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
324 return;
327 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
328 /* If the client asked for readonly FETCH, we remapped this to
329 FETCH_WITH_HEADER when calling the daemon. So we must
330 strip the extra header off the reply data before passing
331 it back to the client.
333 if (dstate->readonly_fetch
334 && dstate->client_callid == CTDB_FETCH_FUNC) {
335 length -= sizeof(struct ctdb_ltdb_header);
338 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
339 length, struct ctdb_reply_call);
340 if (r == NULL) {
341 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
342 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
343 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
344 return;
346 r->hdr.reqid = dstate->reqid;
347 r->status = dstate->call->status;
349 if (dstate->readonly_fetch
350 && dstate->client_callid == CTDB_FETCH_FUNC) {
351 /* client only asked for a FETCH so we must strip off
352 the extra ctdb_ltdb header
354 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
355 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
356 } else {
357 r->datalen = dstate->call->reply_data.dsize;
358 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
361 res = daemon_queue_send(client, &r->hdr);
362 if (res == -1) {
363 /* client is dead - return immediately */
364 return;
366 if (res != 0) {
367 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
369 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
370 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
371 talloc_free(dstate);
374 struct ctdb_daemon_packet_wrap {
375 struct ctdb_context *ctdb;
376 uint32_t client_id;
380 a wrapper to catch disconnected clients
382 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
384 struct ctdb_client *client;
385 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
386 struct ctdb_daemon_packet_wrap);
387 if (w == NULL) {
388 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
389 return;
392 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
393 if (client == NULL) {
394 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
395 w->client_id));
396 talloc_free(w);
397 return;
399 talloc_free(w);
401 /* process it */
402 daemon_incoming_packet(client, hdr);
405 struct ctdb_deferred_fetch_call {
406 struct ctdb_deferred_fetch_call *next, *prev;
407 struct ctdb_req_call *c;
408 struct ctdb_daemon_packet_wrap *w;
411 struct ctdb_deferred_fetch_queue {
412 struct ctdb_deferred_fetch_call *deferred_calls;
415 struct ctdb_deferred_requeue {
416 struct ctdb_deferred_fetch_call *dfc;
417 struct ctdb_client *client;
420 /* called from a timer event and starts reprocessing the deferred call.*/
421 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
422 struct timeval t, void *private_data)
424 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
425 struct ctdb_client *client = dfr->client;
427 talloc_steal(client, dfr->dfc->c);
428 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
429 talloc_free(dfr);
432 /* the referral context is destroyed either after a timeout or when the initial
433 fetch-lock has finished.
434 at this stage, immediately start reprocessing the queued up deferred
435 calls so they get reprocessed immediately (and since we are dmaster at
436 this stage, trigger the waiting smbd processes to pick up and aquire the
437 record right away.
439 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
442 /* need to reprocess the packets from the queue explicitely instead of
443 just using a normal destructor since we want, need, to
444 call the clients in the same oder as the requests queued up
446 while (dfq->deferred_calls != NULL) {
447 struct ctdb_client *client;
448 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
449 struct ctdb_deferred_requeue *dfr;
451 DLIST_REMOVE(dfq->deferred_calls, dfc);
453 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
454 if (client == NULL) {
455 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
456 dfc->w->client_id));
457 continue;
460 /* process it by pushing it back onto the eventloop */
461 dfr = talloc(client, struct ctdb_deferred_requeue);
462 if (dfr == NULL) {
463 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
464 continue;
467 dfr->dfc = talloc_steal(dfr, dfc);
468 dfr->client = client;
470 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
473 return 0;
476 /* insert the new deferral context into the rb tree.
477 there should never be a pre-existing context here, but check for it
478 warn and destroy the previous context if there is already a deferral context
479 for this key.
481 static void *insert_dfq_callback(void *parm, void *data)
483 if (data) {
484 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
485 talloc_free(data);
487 return parm;
490 /* if the original fetch-lock did not complete within a reasonable time,
491 free the context and context for all deferred requests to cause them to be
492 re-inserted into the event system.
494 static void dfq_timeout(struct event_context *ev, struct timed_event *te,
495 struct timeval t, void *private_data)
497 talloc_free(private_data);
500 /* This function is used in the local daemon to register a KEY in a database
501 for being "fetched"
502 While the remote fetch is in-flight, any futher attempts to re-fetch the
503 same record will be deferred until the fetch completes.
505 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
507 uint32_t *k;
508 struct ctdb_deferred_fetch_queue *dfq;
510 k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
511 if (k == NULL) {
512 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
513 return -1;
516 k[0] = (call->key.dsize + 3) / 4 + 1;
517 memcpy(&k[1], call->key.dptr, call->key.dsize);
519 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
520 if (dfq == NULL) {
521 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
522 talloc_free(k);
523 return -1;
525 dfq->deferred_calls = NULL;
527 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
529 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
531 /* if the fetch havent completed in 30 seconds, just tear it all down
532 and let it try again as the events are reissued */
533 event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
535 talloc_free(k);
536 return 0;
539 /* check if this is a duplicate request to a fetch already in-flight
540 if it is, make this call deferred to be reprocessed later when
541 the in-flight fetch completes.
543 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
545 uint32_t *k;
546 struct ctdb_deferred_fetch_queue *dfq;
547 struct ctdb_deferred_fetch_call *dfc;
549 k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
550 if (k == NULL) {
551 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
552 return -1;
555 k[0] = (key.dsize + 3) / 4 + 1;
556 memcpy(&k[1], key.dptr, key.dsize);
558 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
559 if (dfq == NULL) {
560 talloc_free(k);
561 return -1;
565 talloc_free(k);
567 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
568 if (dfc == NULL) {
569 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
570 return -1;
573 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
574 if (dfc->w == NULL) {
575 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
576 talloc_free(dfc);
577 return -1;
580 dfc->c = talloc_steal(dfc, c);
581 dfc->w->ctdb = ctdb_db->ctdb;
582 dfc->w->client_id = client->client_id;
584 DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
586 return 0;
591 this is called when the ctdb daemon received a ctdb request call
592 from a local client over the unix domain socket
594 static void daemon_request_call_from_client(struct ctdb_client *client,
595 struct ctdb_req_call *c)
597 struct ctdb_call_state *state;
598 struct ctdb_db_context *ctdb_db;
599 struct daemon_call_state *dstate;
600 struct ctdb_call *call;
601 struct ctdb_ltdb_header header;
602 TDB_DATA key, data;
603 int ret;
604 struct ctdb_context *ctdb = client->ctdb;
605 struct ctdb_daemon_packet_wrap *w;
607 CTDB_INCREMENT_STAT(ctdb, total_calls);
608 CTDB_DECREMENT_STAT(ctdb, pending_calls);
610 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
611 if (!ctdb_db) {
612 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
613 c->db_id));
614 CTDB_DECREMENT_STAT(ctdb, pending_calls);
615 return;
618 if (ctdb_db->unhealthy_reason) {
620 * this is just a warning, as the tdb should be empty anyway,
621 * and only persistent databases can be unhealthy, which doesn't
622 * use this code patch
624 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
625 ctdb_db->db_name, ctdb_db->unhealthy_reason));
628 key.dptr = c->data;
629 key.dsize = c->keylen;
631 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
632 CTDB_NO_MEMORY_VOID(ctdb, w);
634 w->ctdb = ctdb;
635 w->client_id = client->client_id;
637 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
638 (struct ctdb_req_header *)c, &data,
639 daemon_incoming_packet_wrap, w, true);
640 if (ret == -2) {
641 /* will retry later */
642 CTDB_DECREMENT_STAT(ctdb, pending_calls);
643 return;
646 talloc_free(w);
648 if (ret != 0) {
649 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
650 CTDB_DECREMENT_STAT(ctdb, pending_calls);
651 return;
655 /* check if this fetch request is a duplicate for a
656 request we already have in flight. If so defer it until
657 the first request completes.
659 if (ctdb->tunable.fetch_collapse == 1) {
660 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
661 ret = ctdb_ltdb_unlock(ctdb_db, key);
662 if (ret != 0) {
663 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
665 return;
669 /* Dont do READONLY if we dont have a tracking database */
670 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
671 c->flags &= ~CTDB_WANT_READONLY;
674 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
675 header.flags &= ~CTDB_REC_RO_FLAGS;
676 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
677 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
678 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
679 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
681 /* and clear out the tracking data */
682 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
683 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
687 /* if we are revoking, we must defer all other calls until the revoke
688 * had completed.
690 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
691 talloc_free(data.dptr);
692 ret = ctdb_ltdb_unlock(ctdb_db, key);
694 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
695 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
697 return;
700 if ((header.dmaster == ctdb->pnn)
701 && (!(c->flags & CTDB_WANT_READONLY))
702 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
703 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
704 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
705 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
707 ret = ctdb_ltdb_unlock(ctdb_db, key);
709 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
710 ctdb_fatal(ctdb, "Failed to start record revoke");
712 talloc_free(data.dptr);
714 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
715 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
718 return;
721 dstate = talloc(client, struct daemon_call_state);
722 if (dstate == NULL) {
723 ret = ctdb_ltdb_unlock(ctdb_db, key);
724 if (ret != 0) {
725 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
728 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
729 CTDB_DECREMENT_STAT(ctdb, pending_calls);
730 return;
732 dstate->start_time = timeval_current();
733 dstate->client = client;
734 dstate->reqid = c->hdr.reqid;
735 talloc_steal(dstate, data.dptr);
737 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
738 if (call == NULL) {
739 ret = ctdb_ltdb_unlock(ctdb_db, key);
740 if (ret != 0) {
741 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
744 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
745 CTDB_DECREMENT_STAT(ctdb, pending_calls);
746 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
747 return;
750 dstate->readonly_fetch = 0;
751 call->call_id = c->callid;
752 call->key = key;
753 call->call_data.dptr = c->data + c->keylen;
754 call->call_data.dsize = c->calldatalen;
755 call->flags = c->flags;
757 if (c->flags & CTDB_WANT_READONLY) {
758 /* client wants readonly record, so translate this into a
759 fetch with header. remember what the client asked for
760 so we can remap the reply back to the proper format for
761 the client in the reply
763 dstate->client_callid = call->call_id;
764 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
765 dstate->readonly_fetch = 1;
768 if (header.dmaster == ctdb->pnn) {
769 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
770 } else {
771 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
772 if (ctdb->tunable.fetch_collapse == 1) {
773 /* This request triggered a remote fetch-lock.
774 set up a deferral for this key so any additional
775 fetch-locks are deferred until the current one
776 finishes.
778 setup_deferred_fetch_locks(ctdb_db, call);
782 ret = ctdb_ltdb_unlock(ctdb_db, key);
783 if (ret != 0) {
784 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
787 if (state == NULL) {
788 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
789 CTDB_DECREMENT_STAT(ctdb, pending_calls);
790 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
791 return;
793 talloc_steal(state, dstate);
794 talloc_steal(client, state);
796 state->async.fn = daemon_call_from_client_callback;
797 state->async.private_data = dstate;
801 static void daemon_request_control_from_client(struct ctdb_client *client,
802 struct ctdb_req_control *c);
804 /* data contains a packet from the client */
805 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
807 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
808 TALLOC_CTX *tmp_ctx;
809 struct ctdb_context *ctdb = client->ctdb;
811 /* place the packet as a child of a tmp_ctx. We then use
812 talloc_free() below to free it. If any of the calls want
813 to keep it, then they will steal it somewhere else, and the
814 talloc_free() will be a no-op */
815 tmp_ctx = talloc_new(client);
816 talloc_steal(tmp_ctx, hdr);
818 if (hdr->ctdb_magic != CTDB_MAGIC) {
819 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
820 goto done;
823 if (hdr->ctdb_version != CTDB_VERSION) {
824 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
825 goto done;
828 switch (hdr->operation) {
829 case CTDB_REQ_CALL:
830 CTDB_INCREMENT_STAT(ctdb, client.req_call);
831 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
832 break;
834 case CTDB_REQ_MESSAGE:
835 CTDB_INCREMENT_STAT(ctdb, client.req_message);
836 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
837 break;
839 case CTDB_REQ_CONTROL:
840 CTDB_INCREMENT_STAT(ctdb, client.req_control);
841 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
842 break;
844 default:
845 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
846 hdr->operation));
849 done:
850 talloc_free(tmp_ctx);
854 called when the daemon gets a incoming packet
856 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
858 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
859 struct ctdb_req_header *hdr;
861 if (cnt == 0) {
862 talloc_free(client);
863 return;
866 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
868 if (cnt < sizeof(*hdr)) {
869 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
870 (unsigned)cnt);
871 return;
873 hdr = (struct ctdb_req_header *)data;
874 if (cnt != hdr->length) {
875 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
876 (unsigned)hdr->length, (unsigned)cnt);
877 return;
880 if (hdr->ctdb_magic != CTDB_MAGIC) {
881 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
882 return;
885 if (hdr->ctdb_version != CTDB_VERSION) {
886 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
887 return;
890 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
891 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
892 hdr->srcnode, hdr->destnode));
894 /* it is the responsibility of the incoming packet function to free 'data' */
895 daemon_incoming_packet(client, hdr);
899 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
901 if (client_pid->ctdb->client_pids != NULL) {
902 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
905 return 0;
909 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
910 uint16_t flags, void *private_data)
912 struct sockaddr_un addr;
913 socklen_t len;
914 int fd;
915 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
916 struct ctdb_client *client;
917 struct ctdb_client_pid_list *client_pid;
918 pid_t peer_pid = 0;
920 memset(&addr, 0, sizeof(addr));
921 len = sizeof(addr);
922 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
923 if (fd == -1) {
924 return;
927 set_nonblocking(fd);
928 set_close_on_exec(fd);
930 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
932 client = talloc_zero(ctdb, struct ctdb_client);
933 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
934 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
937 client->ctdb = ctdb;
938 client->fd = fd;
939 client->client_id = ctdb_reqid_new(ctdb, client);
940 client->pid = peer_pid;
942 client_pid = talloc(client, struct ctdb_client_pid_list);
943 if (client_pid == NULL) {
944 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
945 close(fd);
946 talloc_free(client);
947 return;
949 client_pid->ctdb = ctdb;
950 client_pid->pid = peer_pid;
951 client_pid->client = client;
953 DLIST_ADD(ctdb->client_pids, client_pid);
955 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
956 ctdb_daemon_read_cb, client,
957 "client-%u", client->pid);
959 talloc_set_destructor(client, ctdb_client_destructor);
960 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
961 ctdb->num_clients++;
967 create a unix domain socket and bind it
968 return a file descriptor open on the socket
970 static int ux_socket_bind(struct ctdb_context *ctdb)
972 struct sockaddr_un addr;
974 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
975 if (ctdb->daemon.sd == -1) {
976 return -1;
979 memset(&addr, 0, sizeof(addr));
980 addr.sun_family = AF_UNIX;
981 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
983 /* First check if an old ctdbd might be running */
984 if (connect(ctdb->daemon.sd,
985 (struct sockaddr *)&addr, sizeof(addr)) == 0) {
986 DEBUG(DEBUG_CRIT,
987 ("Something is already listening on ctdb socket '%s'\n",
988 ctdb->daemon.name));
989 goto failed;
992 /* Remove any old socket */
993 unlink(ctdb->daemon.name);
995 set_close_on_exec(ctdb->daemon.sd);
996 set_nonblocking(ctdb->daemon.sd);
998 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
999 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1000 goto failed;
1003 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1004 chmod(ctdb->daemon.name, 0700) != 0) {
1005 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1006 goto failed;
1010 if (listen(ctdb->daemon.sd, 100) != 0) {
1011 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1012 goto failed;
1015 return 0;
1017 failed:
1018 close(ctdb->daemon.sd);
1019 ctdb->daemon.sd = -1;
1020 return -1;
1023 static void initialise_node_flags (struct ctdb_context *ctdb)
1025 if (ctdb->pnn == -1) {
1026 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1029 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1031 /* do we start out in DISABLED mode? */
1032 if (ctdb->start_as_disabled != 0) {
1033 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1034 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1036 /* do we start out in STOPPED mode? */
1037 if (ctdb->start_as_stopped != 0) {
1038 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1039 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1043 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1044 void *private_data)
1046 if (status != 0) {
1047 ctdb_die(ctdb, "Failed to run setup event");
1049 ctdb_run_notification_script(ctdb, "setup");
1051 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_FIRST_RECOVERY);
1053 /* tell all other nodes we've just started up */
1054 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1055 0, CTDB_CONTROL_STARTUP, 0,
1056 CTDB_CTRL_FLAG_NOREPLY,
1057 tdb_null, NULL, NULL);
1059 /* Start the recovery daemon */
1060 if (ctdb_start_recoverd(ctdb) != 0) {
1061 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1062 exit(11);
1065 ctdb_start_periodic_events(ctdb);
1068 static struct timeval tevent_before_wait_ts;
1069 static struct timeval tevent_after_wait_ts;
1071 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1072 void *private_data)
1074 struct timeval diff;
1075 struct timeval now;
1077 if (getpid() != ctdbd_pid) {
1078 return;
1081 now = timeval_current();
1083 switch (tp) {
1084 case TEVENT_TRACE_BEFORE_WAIT:
1085 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1086 diff = timeval_until(&tevent_after_wait_ts, &now);
1087 if (diff.tv_sec > 3) {
1088 DEBUG(DEBUG_ERR,
1089 ("Handling event took %ld seconds!\n",
1090 diff.tv_sec));
1093 tevent_before_wait_ts = now;
1094 break;
1096 case TEVENT_TRACE_AFTER_WAIT:
1097 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1098 diff = timeval_until(&tevent_before_wait_ts, &now);
1099 if (diff.tv_sec > 3) {
1100 DEBUG(DEBUG_CRIT,
1101 ("No event for %ld seconds!\n",
1102 diff.tv_sec));
1105 tevent_after_wait_ts = now;
1106 break;
1108 default:
1109 /* Do nothing for future tevent trace points */ ;
1113 static void ctdb_remove_pidfile(void)
1115 if (ctdbd_pidfile != NULL && !ctdb_is_child_process()) {
1116 if (unlink(ctdbd_pidfile) == 0) {
1117 DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1118 ctdbd_pidfile));
1119 } else {
1120 DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1121 ctdbd_pidfile));
1126 static void ctdb_create_pidfile(pid_t pid)
1128 if (ctdbd_pidfile != NULL) {
1129 FILE *fp;
1131 fp = fopen(ctdbd_pidfile, "w");
1132 if (fp == NULL) {
1133 DEBUG(DEBUG_ALERT,
1134 ("Failed to open PID file %s\n", ctdbd_pidfile));
1135 exit(11);
1138 fprintf(fp, "%d\n", pid);
1139 fclose(fp);
1140 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1141 atexit(ctdb_remove_pidfile);
1146 start the protocol going as a daemon
1148 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
1150 int res, ret = -1;
1151 struct fd_event *fde;
1152 const char *domain_socket_name;
1154 /* create a unix domain stream socket to listen to */
1155 res = ux_socket_bind(ctdb);
1156 if (res!=0) {
1157 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1158 exit(10);
1161 if (do_fork && fork()) {
1162 return 0;
1165 tdb_reopen_all(false);
1167 if (do_fork) {
1168 setsid();
1169 close(0);
1170 if (open("/dev/null", O_RDONLY) != 0) {
1171 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1172 exit(11);
1175 block_signal(SIGPIPE);
1177 ctdbd_pid = getpid();
1178 ctdb->ctdbd_pid = ctdbd_pid;
1179 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1180 CTDB_VERSION_STRING, ctdbd_pid));
1181 ctdb_create_pidfile(ctdb->ctdbd_pid);
1183 /* Make sure we log something when the daemon terminates.
1184 * This must be the first exit handler to run (so the last to
1185 * be registered.
1187 atexit(print_exit_message);
1189 if (ctdb->do_setsched) {
1190 /* try to set us up as realtime */
1191 ctdb_set_scheduler(ctdb);
1194 /* ensure the socket is deleted on exit of the daemon */
1195 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1196 if (domain_socket_name == NULL) {
1197 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1198 exit(12);
1201 ctdb->ev = event_context_init(NULL);
1202 tevent_loop_allow_nesting(ctdb->ev);
1203 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, NULL);
1204 ret = ctdb_init_tevent_logging(ctdb);
1205 if (ret != 0) {
1206 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1207 exit(1);
1210 /* set up a handler to pick up sigchld */
1211 if (ctdb_init_sigchld(ctdb) == NULL) {
1212 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1213 exit(1);
1216 ctdb_set_child_logging(ctdb);
1217 if (use_syslog) {
1218 if (start_syslog_daemon(ctdb)) {
1219 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1220 exit(10);
1224 /* initialize statistics collection */
1225 ctdb_statistics_init(ctdb);
1227 /* force initial recovery for election */
1228 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1230 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1231 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1232 if (ret != 0) {
1233 ctdb_die(ctdb, "Failed to run init event\n");
1235 ctdb_run_notification_script(ctdb, "init");
1237 if (strcmp(ctdb->transport, "tcp") == 0) {
1238 int ctdb_tcp_init(struct ctdb_context *);
1239 ret = ctdb_tcp_init(ctdb);
1241 #ifdef USE_INFINIBAND
1242 if (strcmp(ctdb->transport, "ib") == 0) {
1243 int ctdb_ibw_init(struct ctdb_context *);
1244 ret = ctdb_ibw_init(ctdb);
1246 #endif
1247 if (ret != 0) {
1248 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1249 return -1;
1252 if (ctdb->methods == NULL) {
1253 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1254 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1257 /* initialise the transport */
1258 if (ctdb->methods->initialise(ctdb) != 0) {
1259 ctdb_fatal(ctdb, "transport failed to initialise");
1262 initialise_node_flags(ctdb);
1264 if (ctdb->public_addresses_file) {
1265 ret = ctdb_set_public_addresses(ctdb, true);
1266 if (ret == -1) {
1267 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1268 exit(1);
1270 if (ctdb->do_checkpublicip) {
1271 ctdb_start_monitoring_interfaces(ctdb);
1276 /* attach to existing databases */
1277 if (ctdb_attach_databases(ctdb) != 0) {
1278 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1281 /* start frozen, then let the first election sort things out */
1282 if (!ctdb_blocking_freeze(ctdb)) {
1283 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1286 /* now start accepting clients, only can do this once frozen */
1287 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
1288 EVENT_FD_READ,
1289 ctdb_accept_client, ctdb);
1290 if (fde == NULL) {
1291 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1293 tevent_fd_set_auto_close(fde);
1295 /* release any IPs we hold from previous runs of the daemon */
1296 if (ctdb->tunable.disable_ip_failover == 0) {
1297 ctdb_release_all_ips(ctdb);
1300 /* Start the transport */
1301 if (ctdb->methods->start(ctdb) != 0) {
1302 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1303 ctdb_fatal(ctdb, "transport failed to start");
1306 /* Recovery daemon and timed events are started from the
1307 * callback, only after the setup event completes
1308 * successfully.
1310 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1311 ret = ctdb_event_script_callback(ctdb,
1312 ctdb,
1313 ctdb_setup_event_callback,
1314 ctdb,
1315 false,
1316 CTDB_EVENT_SETUP,
1317 "%s",
1318 "");
1319 if (ret != 0) {
1320 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1321 exit(1);
1324 ctdb_lockdown_memory(ctdb);
1326 /* go into a wait loop to allow other nodes to complete */
1327 event_loop_wait(ctdb->ev);
1329 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1330 exit(1);
1334 allocate a packet for use in daemon<->daemon communication
1336 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1337 TALLOC_CTX *mem_ctx,
1338 enum ctdb_operation operation,
1339 size_t length, size_t slength,
1340 const char *type)
1342 int size;
1343 struct ctdb_req_header *hdr;
1345 length = MAX(length, slength);
1346 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1348 if (ctdb->methods == NULL) {
1349 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1350 operation, (unsigned)length));
1351 return NULL;
1354 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1355 if (hdr == NULL) {
1356 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1357 operation, (unsigned)length));
1358 return NULL;
1360 talloc_set_name_const(hdr, type);
1361 memset(hdr, 0, slength);
1362 hdr->length = length;
1363 hdr->operation = operation;
1364 hdr->ctdb_magic = CTDB_MAGIC;
1365 hdr->ctdb_version = CTDB_VERSION;
1366 hdr->generation = ctdb->vnn_map->generation;
1367 hdr->srcnode = ctdb->pnn;
1369 return hdr;
1372 struct daemon_control_state {
1373 struct daemon_control_state *next, *prev;
1374 struct ctdb_client *client;
1375 struct ctdb_req_control *c;
1376 uint32_t reqid;
1377 struct ctdb_node *node;
1381 callback when a control reply comes in
1383 static void daemon_control_callback(struct ctdb_context *ctdb,
1384 int32_t status, TDB_DATA data,
1385 const char *errormsg,
1386 void *private_data)
1388 struct daemon_control_state *state = talloc_get_type(private_data,
1389 struct daemon_control_state);
1390 struct ctdb_client *client = state->client;
1391 struct ctdb_reply_control *r;
1392 size_t len;
1393 int ret;
1395 /* construct a message to send to the client containing the data */
1396 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1397 if (errormsg) {
1398 len += strlen(errormsg);
1400 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1401 struct ctdb_reply_control);
1402 CTDB_NO_MEMORY_VOID(ctdb, r);
1404 r->hdr.reqid = state->reqid;
1405 r->status = status;
1406 r->datalen = data.dsize;
1407 r->errorlen = 0;
1408 memcpy(&r->data[0], data.dptr, data.dsize);
1409 if (errormsg) {
1410 r->errorlen = strlen(errormsg);
1411 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1414 ret = daemon_queue_send(client, &r->hdr);
1415 if (ret != -1) {
1416 talloc_free(state);
1421 fail all pending controls to a disconnected node
1423 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1425 struct daemon_control_state *state;
1426 while ((state = node->pending_controls)) {
1427 DLIST_REMOVE(node->pending_controls, state);
1428 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1429 "node is disconnected", state);
1434 destroy a daemon_control_state
1436 static int daemon_control_destructor(struct daemon_control_state *state)
1438 if (state->node) {
1439 DLIST_REMOVE(state->node->pending_controls, state);
1441 return 0;
1445 this is called when the ctdb daemon received a ctdb request control
1446 from a local client over the unix domain socket
1448 static void daemon_request_control_from_client(struct ctdb_client *client,
1449 struct ctdb_req_control *c)
1451 TDB_DATA data;
1452 int res;
1453 struct daemon_control_state *state;
1454 TALLOC_CTX *tmp_ctx = talloc_new(client);
1456 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1457 c->hdr.destnode = client->ctdb->pnn;
1460 state = talloc(client, struct daemon_control_state);
1461 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1463 state->client = client;
1464 state->c = talloc_steal(state, c);
1465 state->reqid = c->hdr.reqid;
1466 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1467 state->node = client->ctdb->nodes[c->hdr.destnode];
1468 DLIST_ADD(state->node->pending_controls, state);
1469 } else {
1470 state->node = NULL;
1473 talloc_set_destructor(state, daemon_control_destructor);
1475 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1476 talloc_steal(tmp_ctx, state);
1479 data.dptr = &c->data[0];
1480 data.dsize = c->datalen;
1481 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1482 c->srvid, c->opcode, client->client_id,
1483 c->flags,
1484 data, daemon_control_callback,
1485 state);
1486 if (res != 0) {
1487 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1488 c->hdr.destnode));
1491 talloc_free(tmp_ctx);
1495 register a call function
1497 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1498 ctdb_fn_t fn, int id)
1500 struct ctdb_registered_call *call;
1501 struct ctdb_db_context *ctdb_db;
1503 ctdb_db = find_ctdb_db(ctdb, db_id);
1504 if (ctdb_db == NULL) {
1505 return -1;
1508 call = talloc(ctdb_db, struct ctdb_registered_call);
1509 call->fn = fn;
1510 call->id = id;
1512 DLIST_ADD(ctdb_db->calls, call);
1513 return 0;
1519 this local messaging handler is ugly, but is needed to prevent
1520 recursion in ctdb_send_message() when the destination node is the
1521 same as the source node
1523 struct ctdb_local_message {
1524 struct ctdb_context *ctdb;
1525 uint64_t srvid;
1526 TDB_DATA data;
1529 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1530 struct timeval t, void *private_data)
1532 struct ctdb_local_message *m = talloc_get_type(private_data,
1533 struct ctdb_local_message);
1534 int res;
1536 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1537 if (res != 0) {
1538 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1539 (unsigned long long)m->srvid));
1541 talloc_free(m);
1544 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1546 struct ctdb_local_message *m;
1547 m = talloc(ctdb, struct ctdb_local_message);
1548 CTDB_NO_MEMORY(ctdb, m);
1550 m->ctdb = ctdb;
1551 m->srvid = srvid;
1552 m->data = data;
1553 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1554 if (m->data.dptr == NULL) {
1555 talloc_free(m);
1556 return -1;
1559 /* this needs to be done as an event to prevent recursion */
1560 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1561 return 0;
1565 send a ctdb message
1567 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1568 uint64_t srvid, TDB_DATA data)
1570 struct ctdb_req_message *r;
1571 int len;
1573 if (ctdb->methods == NULL) {
1574 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1575 return -1;
1578 /* see if this is a message to ourselves */
1579 if (pnn == ctdb->pnn) {
1580 return ctdb_local_message(ctdb, srvid, data);
1583 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1584 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1585 struct ctdb_req_message);
1586 CTDB_NO_MEMORY(ctdb, r);
1588 r->hdr.destnode = pnn;
1589 r->srvid = srvid;
1590 r->datalen = data.dsize;
1591 memcpy(&r->data[0], data.dptr, data.dsize);
1593 ctdb_queue_packet(ctdb, &r->hdr);
1595 talloc_free(r);
1596 return 0;
1601 struct ctdb_client_notify_list {
1602 struct ctdb_client_notify_list *next, *prev;
1603 struct ctdb_context *ctdb;
1604 uint64_t srvid;
1605 TDB_DATA data;
1609 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1611 int ret;
1613 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1615 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1616 if (ret != 0) {
1617 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1620 return 0;
1623 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1625 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1626 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1627 struct ctdb_client_notify_list *nl;
1629 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1631 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1632 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1633 return -1;
1636 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1637 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1638 return -1;
1642 if (client == NULL) {
1643 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1644 return -1;
1647 for(nl=client->notify; nl; nl=nl->next) {
1648 if (nl->srvid == notify->srvid) {
1649 break;
1652 if (nl != NULL) {
1653 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1654 return -1;
1657 nl = talloc(client, struct ctdb_client_notify_list);
1658 CTDB_NO_MEMORY(ctdb, nl);
1659 nl->ctdb = ctdb;
1660 nl->srvid = notify->srvid;
1661 nl->data.dsize = notify->len;
1662 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1663 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1664 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1666 DLIST_ADD(client->notify, nl);
1667 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1669 return 0;
1672 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1674 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1675 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1676 struct ctdb_client_notify_list *nl;
1678 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1680 if (client == NULL) {
1681 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1682 return -1;
1685 for(nl=client->notify; nl; nl=nl->next) {
1686 if (nl->srvid == notify->srvid) {
1687 break;
1690 if (nl == NULL) {
1691 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1692 return -1;
1695 DLIST_REMOVE(client->notify, nl);
1696 talloc_set_destructor(nl, NULL);
1697 talloc_free(nl);
1699 return 0;
1702 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1704 struct ctdb_client_pid_list *client_pid;
1706 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1707 if (client_pid->pid == pid) {
1708 return client_pid->client;
1711 return NULL;
1715 /* This control is used by samba when probing if a process (of a samba daemon)
1716 exists on the node.
1717 Samba does this when it needs/wants to check if a subrecord in one of the
1718 databases is still valied, or if it is stale and can be removed.
1719 If the node is in unhealthy or stopped state we just kill of the samba
1720 process holding htis sub-record and return to the calling samba that
1721 the process does not exist.
1722 This allows us to forcefully recall subrecords registered by samba processes
1723 on banned and stopped nodes.
1725 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1727 struct ctdb_client *client;
1729 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1730 client = ctdb_find_client_by_pid(ctdb, pid);
1731 if (client != NULL) {
1732 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1733 talloc_free(client);
1735 return -1;
1738 return kill(pid, 0);
1741 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1743 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1744 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1745 return;
1748 DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1749 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1750 ctdb_stop_recoverd(ctdb);
1751 ctdb_stop_keepalive(ctdb);
1752 ctdb_stop_monitoring(ctdb);
1753 ctdb_release_all_ips(ctdb);
1754 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1755 if (ctdb->methods != NULL) {
1756 ctdb->methods->shutdown(ctdb);
1759 DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1760 exit(exit_code);