ctdb-daemon: Drop attempt to connect to Unix domain socket
[Samba.git] / ctdb / server / ctdb_daemon.c
blob82a203b5366c344aefa9dc3d2d10d560f686429e
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/blocking.h"
38 #include "ctdb_version.h"
39 #include "ctdb_private.h"
40 #include "ctdb_client.h"
42 #include "common/rb_tree.h"
43 #include "common/reqid.h"
44 #include "common/system.h"
45 #include "common/common.h"
46 #include "common/logging.h"
47 #include "common/pidfile.h"
49 struct ctdb_client_pid_list {
50 struct ctdb_client_pid_list *next, *prev;
51 struct ctdb_context *ctdb;
52 pid_t pid;
53 struct ctdb_client *client;
56 const char *ctdbd_pidfile = NULL;
57 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
59 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
61 static void print_exit_message(void)
63 if (debug_extra != NULL && debug_extra[0] != '\0') {
64 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
65 } else {
66 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
68 /* Wait a second to allow pending log messages to be flushed */
69 sleep(1);
75 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
76 struct timeval t, void *private_data)
78 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
80 if (getpid() != ctdb->ctdbd_pid) {
81 return;
84 tevent_add_timer(ctdb->ev, ctdb,
85 timeval_current_ofs(1, 0),
86 ctdb_time_tick, ctdb);
89 /* Used to trigger a dummy event once per second, to make
90 * detection of hangs more reliable.
92 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
94 tevent_add_timer(ctdb->ev, ctdb,
95 timeval_current_ofs(1, 0),
96 ctdb_time_tick, ctdb);
99 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
101 /* start monitoring for connected/disconnected nodes */
102 ctdb_start_keepalive(ctdb);
104 /* start periodic update of tcp tickle lists */
105 ctdb_start_tcp_tickle_update(ctdb);
107 /* start listening for recovery daemon pings */
108 ctdb_control_recd_ping(ctdb);
110 /* start listening to timer ticks */
111 ctdb_start_time_tickd(ctdb);
114 static void ignore_signal(int signum)
116 struct sigaction act;
118 memset(&act, 0, sizeof(act));
120 act.sa_handler = SIG_IGN;
121 sigemptyset(&act.sa_mask);
122 sigaddset(&act.sa_mask, signum);
123 sigaction(signum, &act, NULL);
128 send a packet to a client
130 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
132 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
133 if (hdr->operation == CTDB_REQ_MESSAGE) {
134 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
135 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
136 talloc_free(client);
137 return -1;
140 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
144 message handler for when we are in daemon mode. This redirects the message
145 to the right client
147 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
148 void *private_data)
150 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
151 struct ctdb_req_message_old *r;
152 int len;
154 /* construct a message to send to the client containing the data */
155 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
156 r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
157 len, struct ctdb_req_message_old);
158 CTDB_NO_MEMORY_VOID(client->ctdb, r);
160 talloc_set_name_const(r, "req_message packet");
162 r->srvid = srvid;
163 r->datalen = data.dsize;
164 memcpy(&r->data[0], data.dptr, data.dsize);
166 daemon_queue_send(client, &r->hdr);
168 talloc_free(r);
172 this is called when the ctdb daemon received a ctdb request to
173 set the srvid from the client
175 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
177 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
178 int res;
179 if (client == NULL) {
180 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
181 return -1;
183 res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
184 client);
185 if (res != 0) {
186 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
187 (unsigned long long)srvid));
188 } else {
189 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
190 (unsigned long long)srvid));
193 return res;
197 this is called when the ctdb daemon received a ctdb request to
198 remove a srvid from the client
200 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
202 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
203 if (client == NULL) {
204 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
205 return -1;
207 return srvid_deregister(ctdb->srv, srvid, client);
210 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
211 TDB_DATA *outdata)
213 uint64_t *ids;
214 int i, num_ids;
215 uint8_t *results;
217 if ((indata.dsize % sizeof(uint64_t)) != 0) {
218 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
219 "size=%d\n", (int)indata.dsize));
220 return -1;
223 ids = (uint64_t *)indata.dptr;
224 num_ids = indata.dsize / 8;
226 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
227 if (results == NULL) {
228 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
229 return -1;
231 for (i=0; i<num_ids; i++) {
232 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
233 results[i/8] |= (1 << (i%8));
236 outdata->dptr = (uint8_t *)results;
237 outdata->dsize = talloc_get_size(results);
238 return 0;
242 destroy a ctdb_client
244 static int ctdb_client_destructor(struct ctdb_client *client)
246 struct ctdb_db_context *ctdb_db;
248 ctdb_takeover_client_destructor_hook(client);
249 reqid_remove(client->ctdb->idr, client->client_id);
250 client->ctdb->num_clients--;
252 if (client->num_persistent_updates != 0) {
253 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
254 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
256 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
257 if (ctdb_db) {
258 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
259 "commit active. Forcing recovery.\n"));
260 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
263 * trans3 transaction state:
265 * The destructor sets the pointer to NULL.
267 talloc_free(ctdb_db->persistent_state);
270 return 0;
275 this is called when the ctdb daemon received a ctdb request message
276 from a local client over the unix domain socket
278 static void daemon_request_message_from_client(struct ctdb_client *client,
279 struct ctdb_req_message_old *c)
281 TDB_DATA data;
282 int res;
284 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
285 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
288 /* maybe the message is for another client on this node */
289 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
290 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
291 return;
294 /* its for a remote node */
295 data.dptr = &c->data[0];
296 data.dsize = c->datalen;
297 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
298 c->srvid, data);
299 if (res != 0) {
300 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
301 c->hdr.destnode));
306 struct daemon_call_state {
307 struct ctdb_client *client;
308 uint32_t reqid;
309 struct ctdb_call *call;
310 struct timeval start_time;
312 /* readonly request ? */
313 uint32_t readonly_fetch;
314 uint32_t client_callid;
318 complete a call from a client
320 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
322 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
323 struct daemon_call_state);
324 struct ctdb_reply_call_old *r;
325 int res;
326 uint32_t length;
327 struct ctdb_client *client = dstate->client;
328 struct ctdb_db_context *ctdb_db = state->ctdb_db;
330 talloc_steal(client, dstate);
331 talloc_steal(dstate, dstate->call);
333 res = ctdb_daemon_call_recv(state, dstate->call);
334 if (res != 0) {
335 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
336 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
338 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
339 return;
342 length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
343 /* If the client asked for readonly FETCH, we remapped this to
344 FETCH_WITH_HEADER when calling the daemon. So we must
345 strip the extra header off the reply data before passing
346 it back to the client.
348 if (dstate->readonly_fetch
349 && dstate->client_callid == CTDB_FETCH_FUNC) {
350 length -= sizeof(struct ctdb_ltdb_header);
353 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
354 length, struct ctdb_reply_call_old);
355 if (r == NULL) {
356 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
357 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
358 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
359 return;
361 r->hdr.reqid = dstate->reqid;
362 r->status = dstate->call->status;
364 if (dstate->readonly_fetch
365 && dstate->client_callid == CTDB_FETCH_FUNC) {
366 /* client only asked for a FETCH so we must strip off
367 the extra ctdb_ltdb header
369 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
370 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
371 } else {
372 r->datalen = dstate->call->reply_data.dsize;
373 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
376 res = daemon_queue_send(client, &r->hdr);
377 if (res == -1) {
378 /* client is dead - return immediately */
379 return;
381 if (res != 0) {
382 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
384 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
385 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
386 talloc_free(dstate);
389 struct ctdb_daemon_packet_wrap {
390 struct ctdb_context *ctdb;
391 uint32_t client_id;
395 a wrapper to catch disconnected clients
397 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
399 struct ctdb_client *client;
400 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
401 struct ctdb_daemon_packet_wrap);
402 if (w == NULL) {
403 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
404 return;
407 client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
408 if (client == NULL) {
409 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
410 w->client_id));
411 talloc_free(w);
412 return;
414 talloc_free(w);
416 /* process it */
417 daemon_incoming_packet(client, hdr);
420 struct ctdb_deferred_fetch_call {
421 struct ctdb_deferred_fetch_call *next, *prev;
422 struct ctdb_req_call_old *c;
423 struct ctdb_daemon_packet_wrap *w;
426 struct ctdb_deferred_fetch_queue {
427 struct ctdb_deferred_fetch_call *deferred_calls;
430 struct ctdb_deferred_requeue {
431 struct ctdb_deferred_fetch_call *dfc;
432 struct ctdb_client *client;
435 /* called from a timer event and starts reprocessing the deferred call.*/
436 static void reprocess_deferred_call(struct tevent_context *ev,
437 struct tevent_timer *te,
438 struct timeval t, void *private_data)
440 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
441 struct ctdb_client *client = dfr->client;
443 talloc_steal(client, dfr->dfc->c);
444 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
445 talloc_free(dfr);
448 /* the referral context is destroyed either after a timeout or when the initial
449 fetch-lock has finished.
450 at this stage, immediately start reprocessing the queued up deferred
451 calls so they get reprocessed immediately (and since we are dmaster at
452 this stage, trigger the waiting smbd processes to pick up and aquire the
453 record right away.
455 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
458 /* need to reprocess the packets from the queue explicitely instead of
459 just using a normal destructor since we want, need, to
460 call the clients in the same oder as the requests queued up
462 while (dfq->deferred_calls != NULL) {
463 struct ctdb_client *client;
464 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
465 struct ctdb_deferred_requeue *dfr;
467 DLIST_REMOVE(dfq->deferred_calls, dfc);
469 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
470 if (client == NULL) {
471 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
472 dfc->w->client_id));
473 continue;
476 /* process it by pushing it back onto the eventloop */
477 dfr = talloc(client, struct ctdb_deferred_requeue);
478 if (dfr == NULL) {
479 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
480 continue;
483 dfr->dfc = talloc_steal(dfr, dfc);
484 dfr->client = client;
486 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
487 reprocess_deferred_call, dfr);
490 return 0;
493 /* insert the new deferral context into the rb tree.
494 there should never be a pre-existing context here, but check for it
495 warn and destroy the previous context if there is already a deferral context
496 for this key.
498 static void *insert_dfq_callback(void *parm, void *data)
500 if (data) {
501 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
502 talloc_free(data);
504 return parm;
507 /* if the original fetch-lock did not complete within a reasonable time,
508 free the context and context for all deferred requests to cause them to be
509 re-inserted into the event system.
511 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
512 struct timeval t, void *private_data)
514 talloc_free(private_data);
517 /* This function is used in the local daemon to register a KEY in a database
518 for being "fetched"
519 While the remote fetch is in-flight, any futher attempts to re-fetch the
520 same record will be deferred until the fetch completes.
522 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
524 uint32_t *k;
525 struct ctdb_deferred_fetch_queue *dfq;
527 k = ctdb_key_to_idkey(call, call->key);
528 if (k == NULL) {
529 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
530 return -1;
533 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
534 if (dfq == NULL) {
535 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
536 talloc_free(k);
537 return -1;
539 dfq->deferred_calls = NULL;
541 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
543 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
545 /* if the fetch havent completed in 30 seconds, just tear it all down
546 and let it try again as the events are reissued */
547 tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
548 dfq_timeout, dfq);
550 talloc_free(k);
551 return 0;
554 /* check if this is a duplicate request to a fetch already in-flight
555 if it is, make this call deferred to be reprocessed later when
556 the in-flight fetch completes.
558 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
560 uint32_t *k;
561 struct ctdb_deferred_fetch_queue *dfq;
562 struct ctdb_deferred_fetch_call *dfc;
564 k = ctdb_key_to_idkey(c, key);
565 if (k == NULL) {
566 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
567 return -1;
570 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
571 if (dfq == NULL) {
572 talloc_free(k);
573 return -1;
577 talloc_free(k);
579 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
580 if (dfc == NULL) {
581 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
582 return -1;
585 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
586 if (dfc->w == NULL) {
587 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
588 talloc_free(dfc);
589 return -1;
592 dfc->c = talloc_steal(dfc, c);
593 dfc->w->ctdb = ctdb_db->ctdb;
594 dfc->w->client_id = client->client_id;
596 DLIST_ADD_END(dfq->deferred_calls, dfc);
598 return 0;
603 this is called when the ctdb daemon received a ctdb request call
604 from a local client over the unix domain socket
606 static void daemon_request_call_from_client(struct ctdb_client *client,
607 struct ctdb_req_call_old *c)
609 struct ctdb_call_state *state;
610 struct ctdb_db_context *ctdb_db;
611 struct daemon_call_state *dstate;
612 struct ctdb_call *call;
613 struct ctdb_ltdb_header header;
614 TDB_DATA key, data;
615 int ret;
616 struct ctdb_context *ctdb = client->ctdb;
617 struct ctdb_daemon_packet_wrap *w;
619 CTDB_INCREMENT_STAT(ctdb, total_calls);
620 CTDB_INCREMENT_STAT(ctdb, pending_calls);
622 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
623 if (!ctdb_db) {
624 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
625 c->db_id));
626 CTDB_DECREMENT_STAT(ctdb, pending_calls);
627 return;
630 if (ctdb_db->unhealthy_reason) {
632 * this is just a warning, as the tdb should be empty anyway,
633 * and only persistent databases can be unhealthy, which doesn't
634 * use this code patch
636 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
637 ctdb_db->db_name, ctdb_db->unhealthy_reason));
640 key.dptr = c->data;
641 key.dsize = c->keylen;
643 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
644 CTDB_NO_MEMORY_VOID(ctdb, w);
646 w->ctdb = ctdb;
647 w->client_id = client->client_id;
649 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
650 (struct ctdb_req_header *)c, &data,
651 daemon_incoming_packet_wrap, w, true);
652 if (ret == -2) {
653 /* will retry later */
654 CTDB_DECREMENT_STAT(ctdb, pending_calls);
655 return;
658 talloc_free(w);
660 if (ret != 0) {
661 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
662 CTDB_DECREMENT_STAT(ctdb, pending_calls);
663 return;
667 /* check if this fetch request is a duplicate for a
668 request we already have in flight. If so defer it until
669 the first request completes.
671 if (ctdb->tunable.fetch_collapse == 1) {
672 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
673 ret = ctdb_ltdb_unlock(ctdb_db, key);
674 if (ret != 0) {
675 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
677 CTDB_DECREMENT_STAT(ctdb, pending_calls);
678 return;
682 /* Dont do READONLY if we don't have a tracking database */
683 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
684 c->flags &= ~CTDB_WANT_READONLY;
687 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
688 header.flags &= ~CTDB_REC_RO_FLAGS;
689 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
690 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
691 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
692 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
694 /* and clear out the tracking data */
695 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
696 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
700 /* if we are revoking, we must defer all other calls until the revoke
701 * had completed.
703 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
704 talloc_free(data.dptr);
705 ret = ctdb_ltdb_unlock(ctdb_db, key);
707 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
708 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
710 CTDB_DECREMENT_STAT(ctdb, pending_calls);
711 return;
714 if ((header.dmaster == ctdb->pnn)
715 && (!(c->flags & CTDB_WANT_READONLY))
716 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
717 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
718 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
719 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
721 ret = ctdb_ltdb_unlock(ctdb_db, key);
723 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
724 ctdb_fatal(ctdb, "Failed to start record revoke");
726 talloc_free(data.dptr);
728 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
729 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
732 CTDB_DECREMENT_STAT(ctdb, pending_calls);
733 return;
736 dstate = talloc(client, struct daemon_call_state);
737 if (dstate == NULL) {
738 ret = ctdb_ltdb_unlock(ctdb_db, key);
739 if (ret != 0) {
740 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
743 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
744 CTDB_DECREMENT_STAT(ctdb, pending_calls);
745 return;
747 dstate->start_time = timeval_current();
748 dstate->client = client;
749 dstate->reqid = c->hdr.reqid;
750 talloc_steal(dstate, data.dptr);
752 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
753 if (call == NULL) {
754 ret = ctdb_ltdb_unlock(ctdb_db, key);
755 if (ret != 0) {
756 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
759 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
760 CTDB_DECREMENT_STAT(ctdb, pending_calls);
761 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
762 return;
765 dstate->readonly_fetch = 0;
766 call->call_id = c->callid;
767 call->key = key;
768 call->call_data.dptr = c->data + c->keylen;
769 call->call_data.dsize = c->calldatalen;
770 call->flags = c->flags;
772 if (c->flags & CTDB_WANT_READONLY) {
773 /* client wants readonly record, so translate this into a
774 fetch with header. remember what the client asked for
775 so we can remap the reply back to the proper format for
776 the client in the reply
778 dstate->client_callid = call->call_id;
779 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
780 dstate->readonly_fetch = 1;
783 if (header.dmaster == ctdb->pnn) {
784 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
785 } else {
786 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
787 if (ctdb->tunable.fetch_collapse == 1) {
788 /* This request triggered a remote fetch-lock.
789 set up a deferral for this key so any additional
790 fetch-locks are deferred until the current one
791 finishes.
793 setup_deferred_fetch_locks(ctdb_db, call);
797 ret = ctdb_ltdb_unlock(ctdb_db, key);
798 if (ret != 0) {
799 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
802 if (state == NULL) {
803 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
804 CTDB_DECREMENT_STAT(ctdb, pending_calls);
805 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
806 return;
808 talloc_steal(state, dstate);
809 talloc_steal(client, state);
811 state->async.fn = daemon_call_from_client_callback;
812 state->async.private_data = dstate;
816 static void daemon_request_control_from_client(struct ctdb_client *client,
817 struct ctdb_req_control_old *c);
819 /* data contains a packet from the client */
820 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
822 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
823 TALLOC_CTX *tmp_ctx;
824 struct ctdb_context *ctdb = client->ctdb;
826 /* place the packet as a child of a tmp_ctx. We then use
827 talloc_free() below to free it. If any of the calls want
828 to keep it, then they will steal it somewhere else, and the
829 talloc_free() will be a no-op */
830 tmp_ctx = talloc_new(client);
831 talloc_steal(tmp_ctx, hdr);
833 if (hdr->ctdb_magic != CTDB_MAGIC) {
834 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
835 goto done;
838 if (hdr->ctdb_version != CTDB_PROTOCOL) {
839 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
840 goto done;
843 switch (hdr->operation) {
844 case CTDB_REQ_CALL:
845 CTDB_INCREMENT_STAT(ctdb, client.req_call);
846 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
847 break;
849 case CTDB_REQ_MESSAGE:
850 CTDB_INCREMENT_STAT(ctdb, client.req_message);
851 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
852 break;
854 case CTDB_REQ_CONTROL:
855 CTDB_INCREMENT_STAT(ctdb, client.req_control);
856 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
857 break;
859 default:
860 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
861 hdr->operation));
864 done:
865 talloc_free(tmp_ctx);
869 called when the daemon gets a incoming packet
871 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
873 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
874 struct ctdb_req_header *hdr;
876 if (cnt == 0) {
877 talloc_free(client);
878 return;
881 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
883 if (cnt < sizeof(*hdr)) {
884 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
885 (unsigned)cnt);
886 return;
888 hdr = (struct ctdb_req_header *)data;
889 if (cnt != hdr->length) {
890 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
891 (unsigned)hdr->length, (unsigned)cnt);
892 return;
895 if (hdr->ctdb_magic != CTDB_MAGIC) {
896 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
897 return;
900 if (hdr->ctdb_version != CTDB_PROTOCOL) {
901 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
902 return;
905 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
906 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
907 hdr->srcnode, hdr->destnode));
909 /* it is the responsibility of the incoming packet function to free 'data' */
910 daemon_incoming_packet(client, hdr);
914 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
916 if (client_pid->ctdb->client_pids != NULL) {
917 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
920 return 0;
924 static void ctdb_accept_client(struct tevent_context *ev,
925 struct tevent_fd *fde, uint16_t flags,
926 void *private_data)
928 struct sockaddr_un addr;
929 socklen_t len;
930 int fd;
931 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
932 struct ctdb_client *client;
933 struct ctdb_client_pid_list *client_pid;
934 pid_t peer_pid = 0;
935 int ret;
937 memset(&addr, 0, sizeof(addr));
938 len = sizeof(addr);
939 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
940 if (fd == -1) {
941 return;
944 ret = set_blocking(fd, false);
945 if (ret != 0) {
946 DEBUG(DEBUG_ERR,
947 (__location__
948 " failed to set socket non-blocking (%s)\n",
949 strerror(errno)));
950 close(fd);
951 return;
954 set_close_on_exec(fd);
956 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
958 client = talloc_zero(ctdb, struct ctdb_client);
959 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
960 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
963 client->ctdb = ctdb;
964 client->fd = fd;
965 client->client_id = reqid_new(ctdb->idr, client);
966 client->pid = peer_pid;
968 client_pid = talloc(client, struct ctdb_client_pid_list);
969 if (client_pid == NULL) {
970 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
971 close(fd);
972 talloc_free(client);
973 return;
975 client_pid->ctdb = ctdb;
976 client_pid->pid = peer_pid;
977 client_pid->client = client;
979 DLIST_ADD(ctdb->client_pids, client_pid);
981 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
982 ctdb_daemon_read_cb, client,
983 "client-%u", client->pid);
985 talloc_set_destructor(client, ctdb_client_destructor);
986 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
987 ctdb->num_clients++;
993 create a unix domain socket and bind it
994 return a file descriptor open on the socket
996 static int ux_socket_bind(struct ctdb_context *ctdb)
998 struct sockaddr_un addr;
999 int ret;
1001 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1002 if (ctdb->daemon.sd == -1) {
1003 return -1;
1006 memset(&addr, 0, sizeof(addr));
1007 addr.sun_family = AF_UNIX;
1008 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1010 /* Remove any old socket */
1011 unlink(ctdb->daemon.name);
1013 set_close_on_exec(ctdb->daemon.sd);
1015 ret = set_blocking(ctdb->daemon.sd, false);
1016 if (ret != 0) {
1017 DEBUG(DEBUG_ERR,
1018 (__location__
1019 " failed to set socket non-blocking (%s)\n",
1020 strerror(errno)));
1021 goto failed;
1024 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1025 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1026 goto failed;
1029 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1030 chmod(ctdb->daemon.name, 0700) != 0) {
1031 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1032 goto failed;
1036 if (listen(ctdb->daemon.sd, 100) != 0) {
1037 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1038 goto failed;
1041 DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1042 ctdb->daemon.name));
1043 return 0;
1045 failed:
1046 close(ctdb->daemon.sd);
1047 ctdb->daemon.sd = -1;
1048 return -1;
1051 static void initialise_node_flags (struct ctdb_context *ctdb)
1053 if (ctdb->pnn == -1) {
1054 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1057 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1059 /* do we start out in DISABLED mode? */
1060 if (ctdb->start_as_disabled != 0) {
1061 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1062 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1064 /* do we start out in STOPPED mode? */
1065 if (ctdb->start_as_stopped != 0) {
1066 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1067 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1071 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1072 void *private_data)
1074 if (status != 0) {
1075 ctdb_die(ctdb, "Failed to run setup event");
1077 ctdb_run_notification_script(ctdb, "setup");
1079 /* tell all other nodes we've just started up */
1080 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1081 0, CTDB_CONTROL_STARTUP, 0,
1082 CTDB_CTRL_FLAG_NOREPLY,
1083 tdb_null, NULL, NULL);
1085 /* Start the recovery daemon */
1086 if (ctdb_start_recoverd(ctdb) != 0) {
1087 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1088 exit(11);
1091 ctdb_start_periodic_events(ctdb);
1093 ctdb_wait_for_first_recovery(ctdb);
1096 static struct timeval tevent_before_wait_ts;
1097 static struct timeval tevent_after_wait_ts;
1099 static void ctdb_tevent_trace_init(void)
1101 struct timeval now;
1103 now = timeval_current();
1105 tevent_before_wait_ts = now;
1106 tevent_after_wait_ts = now;
1109 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1110 void *private_data)
1112 struct timeval diff;
1113 struct timeval now;
1114 struct ctdb_context *ctdb =
1115 talloc_get_type(private_data, struct ctdb_context);
1117 if (getpid() != ctdb->ctdbd_pid) {
1118 return;
1121 now = timeval_current();
1123 switch (tp) {
1124 case TEVENT_TRACE_BEFORE_WAIT:
1125 diff = timeval_until(&tevent_after_wait_ts, &now);
1126 if (diff.tv_sec > 3) {
1127 DEBUG(DEBUG_ERR,
1128 ("Handling event took %ld seconds!\n",
1129 diff.tv_sec));
1131 tevent_before_wait_ts = now;
1132 break;
1134 case TEVENT_TRACE_AFTER_WAIT:
1135 diff = timeval_until(&tevent_before_wait_ts, &now);
1136 if (diff.tv_sec > 3) {
1137 DEBUG(DEBUG_ERR,
1138 ("No event for %ld seconds!\n",
1139 diff.tv_sec));
1141 tevent_after_wait_ts = now;
1142 break;
1144 default:
1145 /* Do nothing for future tevent trace points */ ;
1149 static void ctdb_remove_pidfile(void)
1151 TALLOC_FREE(ctdbd_pidfile_ctx);
1154 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1156 if (ctdbd_pidfile != NULL) {
1157 int ret = pidfile_create(mem_ctx, ctdbd_pidfile,
1158 &ctdbd_pidfile_ctx);
1159 if (ret != 0) {
1160 DEBUG(DEBUG_ERR,
1161 ("Failed to create PID file %s\n",
1162 ctdbd_pidfile));
1163 exit(11);
1166 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1167 atexit(ctdb_remove_pidfile);
1171 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1173 int i, j, count;
1175 /* initialize the vnn mapping table, skipping any deleted nodes */
1176 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1177 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1179 count = 0;
1180 for (i = 0; i < ctdb->num_nodes; i++) {
1181 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1182 count++;
1186 ctdb->vnn_map->generation = INVALID_GENERATION;
1187 ctdb->vnn_map->size = count;
1188 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1189 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1191 for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1192 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1193 continue;
1195 ctdb->vnn_map->map[j] = i;
1196 j++;
1200 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1202 int nodeid;
1204 if (ctdb->address == NULL) {
1205 ctdb_fatal(ctdb,
1206 "Can not determine PNN - node address is not set\n");
1209 nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1210 if (nodeid == -1) {
1211 ctdb_fatal(ctdb,
1212 "Can not determine PNN - node address not found in node list\n");
1215 ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1216 DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1220 start the protocol going as a daemon
1222 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1224 int res, ret = -1;
1225 struct tevent_fd *fde;
1227 if (do_fork && fork()) {
1228 return 0;
1231 if (do_fork) {
1232 if (setsid() == -1) {
1233 ctdb_die(ctdb, "Failed to setsid()\n");
1235 close(0);
1236 if (open("/dev/null", O_RDONLY) != 0) {
1237 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1238 exit(11);
1241 ignore_signal(SIGPIPE);
1242 ignore_signal(SIGUSR1);
1244 ctdb->ctdbd_pid = getpid();
1245 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1246 CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1247 ctdb_create_pidfile(ctdb);
1249 /* create a unix domain stream socket to listen to */
1250 res = ux_socket_bind(ctdb);
1251 if (res!=0) {
1252 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1253 exit(10);
1256 /* Make sure we log something when the daemon terminates.
1257 * This must be the first exit handler to run (so the last to
1258 * be registered.
1260 atexit(print_exit_message);
1262 if (ctdb->do_setsched) {
1263 /* try to set us up as realtime */
1264 if (!set_scheduler()) {
1265 exit(1);
1267 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1270 ctdb->ev = tevent_context_init(NULL);
1271 if (ctdb->ev == NULL) {
1272 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1273 exit(1);
1275 tevent_loop_allow_nesting(ctdb->ev);
1276 ctdb_tevent_trace_init();
1277 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1278 ret = ctdb_init_tevent_logging(ctdb);
1279 if (ret != 0) {
1280 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1281 exit(1);
1284 /* set up a handler to pick up sigchld */
1285 if (ctdb_init_sigchld(ctdb) == NULL) {
1286 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1287 exit(1);
1290 ctdb_set_child_logging(ctdb);
1292 TALLOC_FREE(ctdb->srv);
1293 if (srvid_init(ctdb, &ctdb->srv) != 0) {
1294 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1295 exit(1);
1298 /* initialize statistics collection */
1299 ctdb_statistics_init(ctdb);
1301 /* force initial recovery for election */
1302 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1304 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1305 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1306 if (ret != 0) {
1307 ctdb_die(ctdb, "Failed to run init event\n");
1309 ctdb_run_notification_script(ctdb, "init");
1311 if (strcmp(ctdb->transport, "tcp") == 0) {
1312 ret = ctdb_tcp_init(ctdb);
1314 #ifdef USE_INFINIBAND
1315 if (strcmp(ctdb->transport, "ib") == 0) {
1316 ret = ctdb_ibw_init(ctdb);
1318 #endif
1319 if (ret != 0) {
1320 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1321 return -1;
1324 if (ctdb->methods == NULL) {
1325 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1326 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1329 /* Initialise the transport. This sets the node address if it
1330 * was not set via the command-line. */
1331 if (ctdb->methods->initialise(ctdb) != 0) {
1332 ctdb_fatal(ctdb, "transport failed to initialise");
1335 ctdb_set_my_pnn(ctdb);
1337 initialise_node_flags(ctdb);
1339 if (ctdb->public_addresses_file) {
1340 ret = ctdb_set_public_addresses(ctdb, true);
1341 if (ret == -1) {
1342 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1343 exit(1);
1347 ctdb_initialise_vnn_map(ctdb);
1349 /* attach to existing databases */
1350 if (ctdb_attach_databases(ctdb) != 0) {
1351 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1354 /* start frozen, then let the first election sort things out */
1355 if (!ctdb_blocking_freeze(ctdb)) {
1356 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1359 /* now start accepting clients, only can do this once frozen */
1360 fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1361 ctdb_accept_client, ctdb);
1362 if (fde == NULL) {
1363 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1365 tevent_fd_set_auto_close(fde);
1367 /* Start the transport */
1368 if (ctdb->methods->start(ctdb) != 0) {
1369 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1370 ctdb_fatal(ctdb, "transport failed to start");
1373 /* Recovery daemon and timed events are started from the
1374 * callback, only after the setup event completes
1375 * successfully.
1377 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1378 ret = ctdb_event_script_callback(ctdb,
1379 ctdb,
1380 ctdb_setup_event_callback,
1381 ctdb,
1382 CTDB_EVENT_SETUP,
1383 "%s",
1384 "");
1385 if (ret != 0) {
1386 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1387 exit(1);
1390 lockdown_memory(ctdb->valgrinding);
1392 /* go into a wait loop to allow other nodes to complete */
1393 tevent_loop_wait(ctdb->ev);
1395 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1396 exit(1);
1400 allocate a packet for use in daemon<->daemon communication
1402 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1403 TALLOC_CTX *mem_ctx,
1404 enum ctdb_operation operation,
1405 size_t length, size_t slength,
1406 const char *type)
1408 int size;
1409 struct ctdb_req_header *hdr;
1411 length = MAX(length, slength);
1412 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1414 if (ctdb->methods == NULL) {
1415 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1416 operation, (unsigned)length));
1417 return NULL;
1420 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1421 if (hdr == NULL) {
1422 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1423 operation, (unsigned)length));
1424 return NULL;
1426 talloc_set_name_const(hdr, type);
1427 memset(hdr, 0, slength);
1428 hdr->length = length;
1429 hdr->operation = operation;
1430 hdr->ctdb_magic = CTDB_MAGIC;
1431 hdr->ctdb_version = CTDB_PROTOCOL;
1432 hdr->generation = ctdb->vnn_map->generation;
1433 hdr->srcnode = ctdb->pnn;
1435 return hdr;
1438 struct daemon_control_state {
1439 struct daemon_control_state *next, *prev;
1440 struct ctdb_client *client;
1441 struct ctdb_req_control_old *c;
1442 uint32_t reqid;
1443 struct ctdb_node *node;
1447 callback when a control reply comes in
1449 static void daemon_control_callback(struct ctdb_context *ctdb,
1450 int32_t status, TDB_DATA data,
1451 const char *errormsg,
1452 void *private_data)
1454 struct daemon_control_state *state = talloc_get_type(private_data,
1455 struct daemon_control_state);
1456 struct ctdb_client *client = state->client;
1457 struct ctdb_reply_control_old *r;
1458 size_t len;
1459 int ret;
1461 /* construct a message to send to the client containing the data */
1462 len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1463 if (errormsg) {
1464 len += strlen(errormsg);
1466 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1467 struct ctdb_reply_control_old);
1468 CTDB_NO_MEMORY_VOID(ctdb, r);
1470 r->hdr.reqid = state->reqid;
1471 r->status = status;
1472 r->datalen = data.dsize;
1473 r->errorlen = 0;
1474 memcpy(&r->data[0], data.dptr, data.dsize);
1475 if (errormsg) {
1476 r->errorlen = strlen(errormsg);
1477 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1480 ret = daemon_queue_send(client, &r->hdr);
1481 if (ret != -1) {
1482 talloc_free(state);
1487 fail all pending controls to a disconnected node
1489 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1491 struct daemon_control_state *state;
1492 while ((state = node->pending_controls)) {
1493 DLIST_REMOVE(node->pending_controls, state);
1494 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1495 "node is disconnected", state);
1500 destroy a daemon_control_state
1502 static int daemon_control_destructor(struct daemon_control_state *state)
1504 if (state->node) {
1505 DLIST_REMOVE(state->node->pending_controls, state);
1507 return 0;
1511 this is called when the ctdb daemon received a ctdb request control
1512 from a local client over the unix domain socket
1514 static void daemon_request_control_from_client(struct ctdb_client *client,
1515 struct ctdb_req_control_old *c)
1517 TDB_DATA data;
1518 int res;
1519 struct daemon_control_state *state;
1520 TALLOC_CTX *tmp_ctx = talloc_new(client);
1522 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1523 c->hdr.destnode = client->ctdb->pnn;
1526 state = talloc(client, struct daemon_control_state);
1527 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1529 state->client = client;
1530 state->c = talloc_steal(state, c);
1531 state->reqid = c->hdr.reqid;
1532 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1533 state->node = client->ctdb->nodes[c->hdr.destnode];
1534 DLIST_ADD(state->node->pending_controls, state);
1535 } else {
1536 state->node = NULL;
1539 talloc_set_destructor(state, daemon_control_destructor);
1541 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1542 talloc_steal(tmp_ctx, state);
1545 data.dptr = &c->data[0];
1546 data.dsize = c->datalen;
1547 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1548 c->srvid, c->opcode, client->client_id,
1549 c->flags,
1550 data, daemon_control_callback,
1551 state);
1552 if (res != 0) {
1553 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1554 c->hdr.destnode));
1557 talloc_free(tmp_ctx);
1561 register a call function
1563 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1564 ctdb_fn_t fn, int id)
1566 struct ctdb_registered_call *call;
1567 struct ctdb_db_context *ctdb_db;
1569 ctdb_db = find_ctdb_db(ctdb, db_id);
1570 if (ctdb_db == NULL) {
1571 return -1;
1574 call = talloc(ctdb_db, struct ctdb_registered_call);
1575 call->fn = fn;
1576 call->id = id;
1578 DLIST_ADD(ctdb_db->calls, call);
1579 return 0;
1585 this local messaging handler is ugly, but is needed to prevent
1586 recursion in ctdb_send_message() when the destination node is the
1587 same as the source node
1589 struct ctdb_local_message {
1590 struct ctdb_context *ctdb;
1591 uint64_t srvid;
1592 TDB_DATA data;
1595 static void ctdb_local_message_trigger(struct tevent_context *ev,
1596 struct tevent_timer *te,
1597 struct timeval t, void *private_data)
1599 struct ctdb_local_message *m = talloc_get_type(
1600 private_data, struct ctdb_local_message);
1602 srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1603 talloc_free(m);
1606 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1608 struct ctdb_local_message *m;
1609 m = talloc(ctdb, struct ctdb_local_message);
1610 CTDB_NO_MEMORY(ctdb, m);
1612 m->ctdb = ctdb;
1613 m->srvid = srvid;
1614 m->data = data;
1615 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1616 if (m->data.dptr == NULL) {
1617 talloc_free(m);
1618 return -1;
1621 /* this needs to be done as an event to prevent recursion */
1622 tevent_add_timer(ctdb->ev, m, timeval_zero(),
1623 ctdb_local_message_trigger, m);
1624 return 0;
1628 send a ctdb message
1630 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1631 uint64_t srvid, TDB_DATA data)
1633 struct ctdb_req_message_old *r;
1634 int len;
1636 if (ctdb->methods == NULL) {
1637 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1638 return -1;
1641 /* see if this is a message to ourselves */
1642 if (pnn == ctdb->pnn) {
1643 return ctdb_local_message(ctdb, srvid, data);
1646 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1647 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1648 struct ctdb_req_message_old);
1649 CTDB_NO_MEMORY(ctdb, r);
1651 r->hdr.destnode = pnn;
1652 r->srvid = srvid;
1653 r->datalen = data.dsize;
1654 memcpy(&r->data[0], data.dptr, data.dsize);
1656 ctdb_queue_packet(ctdb, &r->hdr);
1658 talloc_free(r);
1659 return 0;
1664 struct ctdb_client_notify_list {
1665 struct ctdb_client_notify_list *next, *prev;
1666 struct ctdb_context *ctdb;
1667 uint64_t srvid;
1668 TDB_DATA data;
1672 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1674 int ret;
1676 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1678 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1679 if (ret != 0) {
1680 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1683 return 0;
1686 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1688 struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1689 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1690 struct ctdb_client_notify_list *nl;
1692 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1694 if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1695 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1696 return -1;
1699 if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1700 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1701 return -1;
1705 if (client == NULL) {
1706 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1707 return -1;
1710 for(nl=client->notify; nl; nl=nl->next) {
1711 if (nl->srvid == notify->srvid) {
1712 break;
1715 if (nl != NULL) {
1716 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1717 return -1;
1720 nl = talloc(client, struct ctdb_client_notify_list);
1721 CTDB_NO_MEMORY(ctdb, nl);
1722 nl->ctdb = ctdb;
1723 nl->srvid = notify->srvid;
1724 nl->data.dsize = notify->len;
1725 nl->data.dptr = talloc_memdup(nl, notify->notify_data,
1726 nl->data.dsize);
1727 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1729 DLIST_ADD(client->notify, nl);
1730 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1732 return 0;
1735 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1737 uint64_t srvid = *(uint64_t *)indata.dptr;
1738 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1739 struct ctdb_client_notify_list *nl;
1741 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1743 if (client == NULL) {
1744 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1745 return -1;
1748 for(nl=client->notify; nl; nl=nl->next) {
1749 if (nl->srvid == srvid) {
1750 break;
1753 if (nl == NULL) {
1754 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1755 return -1;
1758 DLIST_REMOVE(client->notify, nl);
1759 talloc_set_destructor(nl, NULL);
1760 talloc_free(nl);
1762 return 0;
1765 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1767 struct ctdb_client_pid_list *client_pid;
1769 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1770 if (client_pid->pid == pid) {
1771 return client_pid->client;
1774 return NULL;
1778 /* This control is used by samba when probing if a process (of a samba daemon)
1779 exists on the node.
1780 Samba does this when it needs/wants to check if a subrecord in one of the
1781 databases is still valied, or if it is stale and can be removed.
1782 If the node is in unhealthy or stopped state we just kill of the samba
1783 process holding htis sub-record and return to the calling samba that
1784 the process does not exist.
1785 This allows us to forcefully recall subrecords registered by samba processes
1786 on banned and stopped nodes.
1788 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1790 struct ctdb_client *client;
1792 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1793 client = ctdb_find_client_by_pid(ctdb, pid);
1794 if (client != NULL) {
1795 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1796 talloc_free(client);
1798 return -1;
1801 return kill(pid, 0);
1804 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1806 struct ctdb_node_map_old *node_map = NULL;
1808 CHECK_CONTROL_DATA_SIZE(0);
1810 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1811 if (node_map == NULL) {
1812 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1813 return -1;
1816 outdata->dptr = (unsigned char *)node_map;
1817 outdata->dsize = talloc_get_size(outdata->dptr);
1819 return 0;
1822 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1824 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1825 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1826 return;
1829 DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1830 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1831 ctdb_stop_recoverd(ctdb);
1832 ctdb_stop_keepalive(ctdb);
1833 ctdb_stop_monitoring(ctdb);
1834 ctdb_release_all_ips(ctdb);
1835 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1836 if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1837 ctdb->methods->shutdown(ctdb);
1840 DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1841 exit(exit_code);
1844 /* When forking the main daemon and the child process needs to connect
1845 * back to the daemon as a client process, this function can be used
1846 * to change the ctdb context from daemon into client mode. The child
1847 * process must be created using ctdb_fork() and not fork() -
1848 * ctdb_fork() does some necessary housekeeping.
1850 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
1852 int ret;
1853 va_list ap;
1855 /* Add extra information so we can identify this in the logs */
1856 va_start(ap, fmt);
1857 debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
1858 va_end(ap);
1860 /* get a new event context */
1861 ctdb->ev = tevent_context_init(ctdb);
1862 if (ctdb->ev == NULL) {
1863 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1864 exit(1);
1866 tevent_loop_allow_nesting(ctdb->ev);
1868 /* Connect to main CTDB daemon */
1869 ret = ctdb_socket_connect(ctdb);
1870 if (ret != 0) {
1871 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1872 return -1;
1875 ctdb->can_send_controls = true;
1877 return 0;