autobuild: Use make -j on samba-libs/samba-static build as well
[Samba.git] / ctdb / server / ctdb_daemon.c
blob36dcfad859021425dc60a3ffd9527eb4209f58f8
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
37 #include "ctdb_version.h"
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
41 #include "common/rb_tree.h"
42 #include "common/reqid.h"
43 #include "common/system.h"
44 #include "common/common.h"
45 #include "common/logging.h"
47 struct ctdb_client_pid_list {
48 struct ctdb_client_pid_list *next, *prev;
49 struct ctdb_context *ctdb;
50 pid_t pid;
51 struct ctdb_client *client;
54 const char *ctdbd_pidfile = NULL;
56 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
58 static void print_exit_message(void)
60 if (debug_extra != NULL && debug_extra[0] != '\0') {
61 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
62 } else {
63 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
65 /* Wait a second to allow pending log messages to be flushed */
66 sleep(1);
72 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
73 struct timeval t, void *private_data)
75 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
77 if (getpid() != ctdb->ctdbd_pid) {
78 return;
81 tevent_add_timer(ctdb->ev, ctdb,
82 timeval_current_ofs(1, 0),
83 ctdb_time_tick, ctdb);
86 /* Used to trigger a dummy event once per second, to make
87 * detection of hangs more reliable.
89 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
91 tevent_add_timer(ctdb->ev, ctdb,
92 timeval_current_ofs(1, 0),
93 ctdb_time_tick, ctdb);
96 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
98 /* start monitoring for connected/disconnected nodes */
99 ctdb_start_keepalive(ctdb);
101 /* start periodic update of tcp tickle lists */
102 ctdb_start_tcp_tickle_update(ctdb);
104 /* start listening for recovery daemon pings */
105 ctdb_control_recd_ping(ctdb);
107 /* start listening to timer ticks */
108 ctdb_start_time_tickd(ctdb);
111 static void ignore_signal(int signum)
113 struct sigaction act;
115 memset(&act, 0, sizeof(act));
117 act.sa_handler = SIG_IGN;
118 sigemptyset(&act.sa_mask);
119 sigaddset(&act.sa_mask, signum);
120 sigaction(signum, &act, NULL);
125 send a packet to a client
127 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
130 if (hdr->operation == CTDB_REQ_MESSAGE) {
131 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
132 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
133 talloc_free(client);
134 return -1;
137 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
141 message handler for when we are in daemon mode. This redirects the message
142 to the right client
144 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
145 void *private_data)
147 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
148 struct ctdb_req_message_old *r;
149 int len;
151 /* construct a message to send to the client containing the data */
152 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
153 r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
154 len, struct ctdb_req_message_old);
155 CTDB_NO_MEMORY_VOID(client->ctdb, r);
157 talloc_set_name_const(r, "req_message packet");
159 r->srvid = srvid;
160 r->datalen = data.dsize;
161 memcpy(&r->data[0], data.dptr, data.dsize);
163 daemon_queue_send(client, &r->hdr);
165 talloc_free(r);
169 this is called when the ctdb daemon received a ctdb request to
170 set the srvid from the client
172 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
175 int res;
176 if (client == NULL) {
177 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
178 return -1;
180 res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
181 client);
182 if (res != 0) {
183 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
184 (unsigned long long)srvid));
185 } else {
186 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
187 (unsigned long long)srvid));
190 return res;
194 this is called when the ctdb daemon received a ctdb request to
195 remove a srvid from the client
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
199 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
200 if (client == NULL) {
201 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
202 return -1;
204 return srvid_deregister(ctdb->srv, srvid, client);
207 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
208 TDB_DATA *outdata)
210 uint64_t *ids;
211 int i, num_ids;
212 uint8_t *results;
214 if ((indata.dsize % sizeof(uint64_t)) != 0) {
215 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
216 "size=%d\n", (int)indata.dsize));
217 return -1;
220 ids = (uint64_t *)indata.dptr;
221 num_ids = indata.dsize / 8;
223 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
224 if (results == NULL) {
225 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
226 return -1;
228 for (i=0; i<num_ids; i++) {
229 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
230 results[i/8] |= (1 << (i%8));
233 outdata->dptr = (uint8_t *)results;
234 outdata->dsize = talloc_get_size(results);
235 return 0;
239 destroy a ctdb_client
241 static int ctdb_client_destructor(struct ctdb_client *client)
243 struct ctdb_db_context *ctdb_db;
245 ctdb_takeover_client_destructor_hook(client);
246 reqid_remove(client->ctdb->idr, client->client_id);
247 client->ctdb->num_clients--;
249 if (client->num_persistent_updates != 0) {
250 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
251 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
253 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
254 if (ctdb_db) {
255 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
256 "commit active. Forcing recovery.\n"));
257 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
260 * trans3 transaction state:
262 * The destructor sets the pointer to NULL.
264 talloc_free(ctdb_db->persistent_state);
267 return 0;
272 this is called when the ctdb daemon received a ctdb request message
273 from a local client over the unix domain socket
275 static void daemon_request_message_from_client(struct ctdb_client *client,
276 struct ctdb_req_message_old *c)
278 TDB_DATA data;
279 int res;
281 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
282 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
285 /* maybe the message is for another client on this node */
286 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
287 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
288 return;
291 /* its for a remote node */
292 data.dptr = &c->data[0];
293 data.dsize = c->datalen;
294 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
295 c->srvid, data);
296 if (res != 0) {
297 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
298 c->hdr.destnode));
303 struct daemon_call_state {
304 struct ctdb_client *client;
305 uint32_t reqid;
306 struct ctdb_call *call;
307 struct timeval start_time;
309 /* readonly request ? */
310 uint32_t readonly_fetch;
311 uint32_t client_callid;
315 complete a call from a client
317 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
319 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
320 struct daemon_call_state);
321 struct ctdb_reply_call_old *r;
322 int res;
323 uint32_t length;
324 struct ctdb_client *client = dstate->client;
325 struct ctdb_db_context *ctdb_db = state->ctdb_db;
327 talloc_steal(client, dstate);
328 talloc_steal(dstate, dstate->call);
330 res = ctdb_daemon_call_recv(state, dstate->call);
331 if (res != 0) {
332 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
333 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
335 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
336 return;
339 length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
340 /* If the client asked for readonly FETCH, we remapped this to
341 FETCH_WITH_HEADER when calling the daemon. So we must
342 strip the extra header off the reply data before passing
343 it back to the client.
345 if (dstate->readonly_fetch
346 && dstate->client_callid == CTDB_FETCH_FUNC) {
347 length -= sizeof(struct ctdb_ltdb_header);
350 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
351 length, struct ctdb_reply_call_old);
352 if (r == NULL) {
353 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
354 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
355 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
356 return;
358 r->hdr.reqid = dstate->reqid;
359 r->status = dstate->call->status;
361 if (dstate->readonly_fetch
362 && dstate->client_callid == CTDB_FETCH_FUNC) {
363 /* client only asked for a FETCH so we must strip off
364 the extra ctdb_ltdb header
366 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
367 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
368 } else {
369 r->datalen = dstate->call->reply_data.dsize;
370 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
373 res = daemon_queue_send(client, &r->hdr);
374 if (res == -1) {
375 /* client is dead - return immediately */
376 return;
378 if (res != 0) {
379 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
381 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
382 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
383 talloc_free(dstate);
386 struct ctdb_daemon_packet_wrap {
387 struct ctdb_context *ctdb;
388 uint32_t client_id;
392 a wrapper to catch disconnected clients
394 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
396 struct ctdb_client *client;
397 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
398 struct ctdb_daemon_packet_wrap);
399 if (w == NULL) {
400 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
401 return;
404 client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
405 if (client == NULL) {
406 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
407 w->client_id));
408 talloc_free(w);
409 return;
411 talloc_free(w);
413 /* process it */
414 daemon_incoming_packet(client, hdr);
417 struct ctdb_deferred_fetch_call {
418 struct ctdb_deferred_fetch_call *next, *prev;
419 struct ctdb_req_call_old *c;
420 struct ctdb_daemon_packet_wrap *w;
423 struct ctdb_deferred_fetch_queue {
424 struct ctdb_deferred_fetch_call *deferred_calls;
427 struct ctdb_deferred_requeue {
428 struct ctdb_deferred_fetch_call *dfc;
429 struct ctdb_client *client;
432 /* called from a timer event and starts reprocessing the deferred call.*/
433 static void reprocess_deferred_call(struct tevent_context *ev,
434 struct tevent_timer *te,
435 struct timeval t, void *private_data)
437 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
438 struct ctdb_client *client = dfr->client;
440 talloc_steal(client, dfr->dfc->c);
441 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
442 talloc_free(dfr);
445 /* the referral context is destroyed either after a timeout or when the initial
446 fetch-lock has finished.
447 at this stage, immediately start reprocessing the queued up deferred
448 calls so they get reprocessed immediately (and since we are dmaster at
449 this stage, trigger the waiting smbd processes to pick up and aquire the
450 record right away.
452 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
455 /* need to reprocess the packets from the queue explicitely instead of
456 just using a normal destructor since we want, need, to
457 call the clients in the same oder as the requests queued up
459 while (dfq->deferred_calls != NULL) {
460 struct ctdb_client *client;
461 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
462 struct ctdb_deferred_requeue *dfr;
464 DLIST_REMOVE(dfq->deferred_calls, dfc);
466 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
467 if (client == NULL) {
468 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
469 dfc->w->client_id));
470 continue;
473 /* process it by pushing it back onto the eventloop */
474 dfr = talloc(client, struct ctdb_deferred_requeue);
475 if (dfr == NULL) {
476 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
477 continue;
480 dfr->dfc = talloc_steal(dfr, dfc);
481 dfr->client = client;
483 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
484 reprocess_deferred_call, dfr);
487 return 0;
490 /* insert the new deferral context into the rb tree.
491 there should never be a pre-existing context here, but check for it
492 warn and destroy the previous context if there is already a deferral context
493 for this key.
495 static void *insert_dfq_callback(void *parm, void *data)
497 if (data) {
498 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
499 talloc_free(data);
501 return parm;
504 /* if the original fetch-lock did not complete within a reasonable time,
505 free the context and context for all deferred requests to cause them to be
506 re-inserted into the event system.
508 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
509 struct timeval t, void *private_data)
511 talloc_free(private_data);
514 /* This function is used in the local daemon to register a KEY in a database
515 for being "fetched"
516 While the remote fetch is in-flight, any futher attempts to re-fetch the
517 same record will be deferred until the fetch completes.
519 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
521 uint32_t *k;
522 struct ctdb_deferred_fetch_queue *dfq;
524 k = ctdb_key_to_idkey(call, call->key);
525 if (k == NULL) {
526 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
527 return -1;
530 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
531 if (dfq == NULL) {
532 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
533 talloc_free(k);
534 return -1;
536 dfq->deferred_calls = NULL;
538 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
540 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
542 /* if the fetch havent completed in 30 seconds, just tear it all down
543 and let it try again as the events are reissued */
544 tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
545 dfq_timeout, dfq);
547 talloc_free(k);
548 return 0;
551 /* check if this is a duplicate request to a fetch already in-flight
552 if it is, make this call deferred to be reprocessed later when
553 the in-flight fetch completes.
555 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
557 uint32_t *k;
558 struct ctdb_deferred_fetch_queue *dfq;
559 struct ctdb_deferred_fetch_call *dfc;
561 k = ctdb_key_to_idkey(c, key);
562 if (k == NULL) {
563 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
564 return -1;
567 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
568 if (dfq == NULL) {
569 talloc_free(k);
570 return -1;
574 talloc_free(k);
576 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
577 if (dfc == NULL) {
578 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
579 return -1;
582 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
583 if (dfc->w == NULL) {
584 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
585 talloc_free(dfc);
586 return -1;
589 dfc->c = talloc_steal(dfc, c);
590 dfc->w->ctdb = ctdb_db->ctdb;
591 dfc->w->client_id = client->client_id;
593 DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
595 return 0;
600 this is called when the ctdb daemon received a ctdb request call
601 from a local client over the unix domain socket
603 static void daemon_request_call_from_client(struct ctdb_client *client,
604 struct ctdb_req_call_old *c)
606 struct ctdb_call_state *state;
607 struct ctdb_db_context *ctdb_db;
608 struct daemon_call_state *dstate;
609 struct ctdb_call *call;
610 struct ctdb_ltdb_header header;
611 TDB_DATA key, data;
612 int ret;
613 struct ctdb_context *ctdb = client->ctdb;
614 struct ctdb_daemon_packet_wrap *w;
616 CTDB_INCREMENT_STAT(ctdb, total_calls);
617 CTDB_INCREMENT_STAT(ctdb, pending_calls);
619 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
620 if (!ctdb_db) {
621 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
622 c->db_id));
623 CTDB_DECREMENT_STAT(ctdb, pending_calls);
624 return;
627 if (ctdb_db->unhealthy_reason) {
629 * this is just a warning, as the tdb should be empty anyway,
630 * and only persistent databases can be unhealthy, which doesn't
631 * use this code patch
633 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
634 ctdb_db->db_name, ctdb_db->unhealthy_reason));
637 key.dptr = c->data;
638 key.dsize = c->keylen;
640 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
641 CTDB_NO_MEMORY_VOID(ctdb, w);
643 w->ctdb = ctdb;
644 w->client_id = client->client_id;
646 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
647 (struct ctdb_req_header *)c, &data,
648 daemon_incoming_packet_wrap, w, true);
649 if (ret == -2) {
650 /* will retry later */
651 CTDB_DECREMENT_STAT(ctdb, pending_calls);
652 return;
655 talloc_free(w);
657 if (ret != 0) {
658 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
659 CTDB_DECREMENT_STAT(ctdb, pending_calls);
660 return;
664 /* check if this fetch request is a duplicate for a
665 request we already have in flight. If so defer it until
666 the first request completes.
668 if (ctdb->tunable.fetch_collapse == 1) {
669 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
670 ret = ctdb_ltdb_unlock(ctdb_db, key);
671 if (ret != 0) {
672 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
674 CTDB_DECREMENT_STAT(ctdb, pending_calls);
675 return;
679 /* Dont do READONLY if we don't have a tracking database */
680 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
681 c->flags &= ~CTDB_WANT_READONLY;
684 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
685 header.flags &= ~CTDB_REC_RO_FLAGS;
686 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
687 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
688 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
689 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
691 /* and clear out the tracking data */
692 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
693 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
697 /* if we are revoking, we must defer all other calls until the revoke
698 * had completed.
700 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
701 talloc_free(data.dptr);
702 ret = ctdb_ltdb_unlock(ctdb_db, key);
704 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
705 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
707 CTDB_DECREMENT_STAT(ctdb, pending_calls);
708 return;
711 if ((header.dmaster == ctdb->pnn)
712 && (!(c->flags & CTDB_WANT_READONLY))
713 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
714 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
715 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
716 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
718 ret = ctdb_ltdb_unlock(ctdb_db, key);
720 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
721 ctdb_fatal(ctdb, "Failed to start record revoke");
723 talloc_free(data.dptr);
725 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
726 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
729 CTDB_DECREMENT_STAT(ctdb, pending_calls);
730 return;
733 dstate = talloc(client, struct daemon_call_state);
734 if (dstate == NULL) {
735 ret = ctdb_ltdb_unlock(ctdb_db, key);
736 if (ret != 0) {
737 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
740 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
741 CTDB_DECREMENT_STAT(ctdb, pending_calls);
742 return;
744 dstate->start_time = timeval_current();
745 dstate->client = client;
746 dstate->reqid = c->hdr.reqid;
747 talloc_steal(dstate, data.dptr);
749 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
750 if (call == NULL) {
751 ret = ctdb_ltdb_unlock(ctdb_db, key);
752 if (ret != 0) {
753 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
756 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
757 CTDB_DECREMENT_STAT(ctdb, pending_calls);
758 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
759 return;
762 dstate->readonly_fetch = 0;
763 call->call_id = c->callid;
764 call->key = key;
765 call->call_data.dptr = c->data + c->keylen;
766 call->call_data.dsize = c->calldatalen;
767 call->flags = c->flags;
769 if (c->flags & CTDB_WANT_READONLY) {
770 /* client wants readonly record, so translate this into a
771 fetch with header. remember what the client asked for
772 so we can remap the reply back to the proper format for
773 the client in the reply
775 dstate->client_callid = call->call_id;
776 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
777 dstate->readonly_fetch = 1;
780 if (header.dmaster == ctdb->pnn) {
781 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
782 } else {
783 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
784 if (ctdb->tunable.fetch_collapse == 1) {
785 /* This request triggered a remote fetch-lock.
786 set up a deferral for this key so any additional
787 fetch-locks are deferred until the current one
788 finishes.
790 setup_deferred_fetch_locks(ctdb_db, call);
794 ret = ctdb_ltdb_unlock(ctdb_db, key);
795 if (ret != 0) {
796 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
799 if (state == NULL) {
800 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
801 CTDB_DECREMENT_STAT(ctdb, pending_calls);
802 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
803 return;
805 talloc_steal(state, dstate);
806 talloc_steal(client, state);
808 state->async.fn = daemon_call_from_client_callback;
809 state->async.private_data = dstate;
813 static void daemon_request_control_from_client(struct ctdb_client *client,
814 struct ctdb_req_control_old *c);
816 /* data contains a packet from the client */
817 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
819 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
820 TALLOC_CTX *tmp_ctx;
821 struct ctdb_context *ctdb = client->ctdb;
823 /* place the packet as a child of a tmp_ctx. We then use
824 talloc_free() below to free it. If any of the calls want
825 to keep it, then they will steal it somewhere else, and the
826 talloc_free() will be a no-op */
827 tmp_ctx = talloc_new(client);
828 talloc_steal(tmp_ctx, hdr);
830 if (hdr->ctdb_magic != CTDB_MAGIC) {
831 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
832 goto done;
835 if (hdr->ctdb_version != CTDB_PROTOCOL) {
836 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
837 goto done;
840 switch (hdr->operation) {
841 case CTDB_REQ_CALL:
842 CTDB_INCREMENT_STAT(ctdb, client.req_call);
843 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
844 break;
846 case CTDB_REQ_MESSAGE:
847 CTDB_INCREMENT_STAT(ctdb, client.req_message);
848 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
849 break;
851 case CTDB_REQ_CONTROL:
852 CTDB_INCREMENT_STAT(ctdb, client.req_control);
853 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
854 break;
856 default:
857 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
858 hdr->operation));
861 done:
862 talloc_free(tmp_ctx);
866 called when the daemon gets a incoming packet
868 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
870 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
871 struct ctdb_req_header *hdr;
873 if (cnt == 0) {
874 talloc_free(client);
875 return;
878 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
880 if (cnt < sizeof(*hdr)) {
881 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
882 (unsigned)cnt);
883 return;
885 hdr = (struct ctdb_req_header *)data;
886 if (cnt != hdr->length) {
887 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
888 (unsigned)hdr->length, (unsigned)cnt);
889 return;
892 if (hdr->ctdb_magic != CTDB_MAGIC) {
893 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
894 return;
897 if (hdr->ctdb_version != CTDB_PROTOCOL) {
898 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
899 return;
902 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
903 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
904 hdr->srcnode, hdr->destnode));
906 /* it is the responsibility of the incoming packet function to free 'data' */
907 daemon_incoming_packet(client, hdr);
911 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
913 if (client_pid->ctdb->client_pids != NULL) {
914 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
917 return 0;
921 static void ctdb_accept_client(struct tevent_context *ev,
922 struct tevent_fd *fde, uint16_t flags,
923 void *private_data)
925 struct sockaddr_un addr;
926 socklen_t len;
927 int fd;
928 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
929 struct ctdb_client *client;
930 struct ctdb_client_pid_list *client_pid;
931 pid_t peer_pid = 0;
933 memset(&addr, 0, sizeof(addr));
934 len = sizeof(addr);
935 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
936 if (fd == -1) {
937 return;
940 set_nonblocking(fd);
941 set_close_on_exec(fd);
943 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
945 client = talloc_zero(ctdb, struct ctdb_client);
946 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
947 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
950 client->ctdb = ctdb;
951 client->fd = fd;
952 client->client_id = reqid_new(ctdb->idr, client);
953 client->pid = peer_pid;
955 client_pid = talloc(client, struct ctdb_client_pid_list);
956 if (client_pid == NULL) {
957 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
958 close(fd);
959 talloc_free(client);
960 return;
962 client_pid->ctdb = ctdb;
963 client_pid->pid = peer_pid;
964 client_pid->client = client;
966 DLIST_ADD(ctdb->client_pids, client_pid);
968 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
969 ctdb_daemon_read_cb, client,
970 "client-%u", client->pid);
972 talloc_set_destructor(client, ctdb_client_destructor);
973 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
974 ctdb->num_clients++;
980 create a unix domain socket and bind it
981 return a file descriptor open on the socket
983 static int ux_socket_bind(struct ctdb_context *ctdb)
985 struct sockaddr_un addr;
987 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
988 if (ctdb->daemon.sd == -1) {
989 return -1;
992 memset(&addr, 0, sizeof(addr));
993 addr.sun_family = AF_UNIX;
994 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
996 /* First check if an old ctdbd might be running */
997 if (connect(ctdb->daemon.sd,
998 (struct sockaddr *)&addr, sizeof(addr)) == 0) {
999 DEBUG(DEBUG_CRIT,
1000 ("Something is already listening on ctdb socket '%s'\n",
1001 ctdb->daemon.name));
1002 goto failed;
1005 /* Remove any old socket */
1006 unlink(ctdb->daemon.name);
1008 set_close_on_exec(ctdb->daemon.sd);
1009 set_nonblocking(ctdb->daemon.sd);
1011 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1012 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1013 goto failed;
1016 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1017 chmod(ctdb->daemon.name, 0700) != 0) {
1018 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1019 goto failed;
1023 if (listen(ctdb->daemon.sd, 100) != 0) {
1024 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1025 goto failed;
1028 return 0;
1030 failed:
1031 close(ctdb->daemon.sd);
1032 ctdb->daemon.sd = -1;
1033 return -1;
1036 static void initialise_node_flags (struct ctdb_context *ctdb)
1038 if (ctdb->pnn == -1) {
1039 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1042 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1044 /* do we start out in DISABLED mode? */
1045 if (ctdb->start_as_disabled != 0) {
1046 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1047 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1049 /* do we start out in STOPPED mode? */
1050 if (ctdb->start_as_stopped != 0) {
1051 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1052 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1056 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1057 void *private_data)
1059 if (status != 0) {
1060 ctdb_die(ctdb, "Failed to run setup event");
1062 ctdb_run_notification_script(ctdb, "setup");
1064 /* tell all other nodes we've just started up */
1065 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1066 0, CTDB_CONTROL_STARTUP, 0,
1067 CTDB_CTRL_FLAG_NOREPLY,
1068 tdb_null, NULL, NULL);
1070 /* Start the recovery daemon */
1071 if (ctdb_start_recoverd(ctdb) != 0) {
1072 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1073 exit(11);
1076 ctdb_start_periodic_events(ctdb);
1078 ctdb_wait_for_first_recovery(ctdb);
1081 static struct timeval tevent_before_wait_ts;
1082 static struct timeval tevent_after_wait_ts;
1084 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1085 void *private_data)
1087 struct timeval diff;
1088 struct timeval now;
1089 struct ctdb_context *ctdb =
1090 talloc_get_type(private_data, struct ctdb_context);
1092 if (getpid() != ctdb->ctdbd_pid) {
1093 return;
1096 now = timeval_current();
1098 switch (tp) {
1099 case TEVENT_TRACE_BEFORE_WAIT:
1100 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1101 diff = timeval_until(&tevent_after_wait_ts, &now);
1102 if (diff.tv_sec > 3) {
1103 DEBUG(DEBUG_ERR,
1104 ("Handling event took %ld seconds!\n",
1105 (long)diff.tv_sec));
1108 tevent_before_wait_ts = now;
1109 break;
1111 case TEVENT_TRACE_AFTER_WAIT:
1112 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1113 diff = timeval_until(&tevent_before_wait_ts, &now);
1114 if (diff.tv_sec > 3) {
1115 DEBUG(DEBUG_CRIT,
1116 ("No event for %ld seconds!\n",
1117 (long)diff.tv_sec));
1120 tevent_after_wait_ts = now;
1121 break;
1123 default:
1124 /* Do nothing for future tevent trace points */ ;
1128 static void ctdb_remove_pidfile(void)
1130 /* Only the main ctdbd's PID matches the SID */
1131 if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1132 if (unlink(ctdbd_pidfile) == 0) {
1133 DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1134 ctdbd_pidfile));
1135 } else {
1136 DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1137 ctdbd_pidfile));
1142 static void ctdb_create_pidfile(pid_t pid)
1144 if (ctdbd_pidfile != NULL) {
1145 FILE *fp;
1147 fp = fopen(ctdbd_pidfile, "w");
1148 if (fp == NULL) {
1149 DEBUG(DEBUG_ALERT,
1150 ("Failed to open PID file %s\n", ctdbd_pidfile));
1151 exit(11);
1154 fprintf(fp, "%d\n", pid);
1155 fclose(fp);
1156 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1157 atexit(ctdb_remove_pidfile);
1161 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1163 int i, j, count;
1165 /* initialize the vnn mapping table, skipping any deleted nodes */
1166 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1167 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1169 count = 0;
1170 for (i = 0; i < ctdb->num_nodes; i++) {
1171 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1172 count++;
1176 ctdb->vnn_map->generation = INVALID_GENERATION;
1177 ctdb->vnn_map->size = count;
1178 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1179 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1181 for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1182 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1183 continue;
1185 ctdb->vnn_map->map[j] = i;
1186 j++;
1190 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1192 int nodeid;
1194 if (ctdb->address == NULL) {
1195 ctdb_fatal(ctdb,
1196 "Can not determine PNN - node address is not set\n");
1199 nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1200 if (nodeid == -1) {
1201 ctdb_fatal(ctdb,
1202 "Can not determine PNN - node address not found in node list\n");
1205 ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1206 DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1210 start the protocol going as a daemon
1212 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1214 int res, ret = -1;
1215 struct tevent_fd *fde;
1217 /* create a unix domain stream socket to listen to */
1218 res = ux_socket_bind(ctdb);
1219 if (res!=0) {
1220 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1221 exit(10);
1224 if (do_fork && fork()) {
1225 return 0;
1228 tdb_reopen_all(false);
1230 if (do_fork) {
1231 if (setsid() == -1) {
1232 ctdb_die(ctdb, "Failed to setsid()\n");
1234 close(0);
1235 if (open("/dev/null", O_RDONLY) != 0) {
1236 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1237 exit(11);
1240 ignore_signal(SIGPIPE);
1241 ignore_signal(SIGUSR1);
1243 ctdb->ctdbd_pid = getpid();
1244 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1245 CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1246 ctdb_create_pidfile(ctdb->ctdbd_pid);
1248 /* Make sure we log something when the daemon terminates.
1249 * This must be the first exit handler to run (so the last to
1250 * be registered.
1252 atexit(print_exit_message);
1254 if (ctdb->do_setsched) {
1255 /* try to set us up as realtime */
1256 if (!set_scheduler()) {
1257 exit(1);
1259 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1262 ctdb->ev = tevent_context_init(NULL);
1263 tevent_loop_allow_nesting(ctdb->ev);
1264 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1265 ret = ctdb_init_tevent_logging(ctdb);
1266 if (ret != 0) {
1267 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1268 exit(1);
1271 /* set up a handler to pick up sigchld */
1272 if (ctdb_init_sigchld(ctdb) == NULL) {
1273 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1274 exit(1);
1277 ctdb_set_child_logging(ctdb);
1279 if (srvid_init(ctdb, &ctdb->srv) != 0) {
1280 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1281 exit(1);
1284 /* initialize statistics collection */
1285 ctdb_statistics_init(ctdb);
1287 /* force initial recovery for election */
1288 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1290 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1291 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1292 if (ret != 0) {
1293 ctdb_die(ctdb, "Failed to run init event\n");
1295 ctdb_run_notification_script(ctdb, "init");
1297 if (strcmp(ctdb->transport, "tcp") == 0) {
1298 ret = ctdb_tcp_init(ctdb);
1300 #ifdef USE_INFINIBAND
1301 if (strcmp(ctdb->transport, "ib") == 0) {
1302 ret = ctdb_ibw_init(ctdb);
1304 #endif
1305 if (ret != 0) {
1306 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1307 return -1;
1310 if (ctdb->methods == NULL) {
1311 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1312 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1315 /* Initialise the transport. This sets the node address if it
1316 * was not set via the command-line. */
1317 if (ctdb->methods->initialise(ctdb) != 0) {
1318 ctdb_fatal(ctdb, "transport failed to initialise");
1321 ctdb_set_my_pnn(ctdb);
1323 initialise_node_flags(ctdb);
1325 if (ctdb->public_addresses_file) {
1326 ret = ctdb_set_public_addresses(ctdb, true);
1327 if (ret == -1) {
1328 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1329 exit(1);
1333 ctdb_initialise_vnn_map(ctdb);
1335 /* attach to existing databases */
1336 if (ctdb_attach_databases(ctdb) != 0) {
1337 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1340 /* start frozen, then let the first election sort things out */
1341 if (!ctdb_blocking_freeze(ctdb)) {
1342 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1345 /* now start accepting clients, only can do this once frozen */
1346 fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1347 ctdb_accept_client, ctdb);
1348 if (fde == NULL) {
1349 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1351 tevent_fd_set_auto_close(fde);
1353 /* Start the transport */
1354 if (ctdb->methods->start(ctdb) != 0) {
1355 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1356 ctdb_fatal(ctdb, "transport failed to start");
1359 /* Recovery daemon and timed events are started from the
1360 * callback, only after the setup event completes
1361 * successfully.
1363 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1364 ret = ctdb_event_script_callback(ctdb,
1365 ctdb,
1366 ctdb_setup_event_callback,
1367 ctdb,
1368 CTDB_EVENT_SETUP,
1369 "%s",
1370 "");
1371 if (ret != 0) {
1372 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1373 exit(1);
1376 lockdown_memory(ctdb->valgrinding);
1378 /* go into a wait loop to allow other nodes to complete */
1379 tevent_loop_wait(ctdb->ev);
1381 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1382 exit(1);
1386 allocate a packet for use in daemon<->daemon communication
1388 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1389 TALLOC_CTX *mem_ctx,
1390 enum ctdb_operation operation,
1391 size_t length, size_t slength,
1392 const char *type)
1394 int size;
1395 struct ctdb_req_header *hdr;
1397 length = MAX(length, slength);
1398 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1400 if (ctdb->methods == NULL) {
1401 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1402 operation, (unsigned)length));
1403 return NULL;
1406 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1407 if (hdr == NULL) {
1408 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1409 operation, (unsigned)length));
1410 return NULL;
1412 talloc_set_name_const(hdr, type);
1413 memset(hdr, 0, slength);
1414 hdr->length = length;
1415 hdr->operation = operation;
1416 hdr->ctdb_magic = CTDB_MAGIC;
1417 hdr->ctdb_version = CTDB_PROTOCOL;
1418 hdr->generation = ctdb->vnn_map->generation;
1419 hdr->srcnode = ctdb->pnn;
1421 return hdr;
1424 struct daemon_control_state {
1425 struct daemon_control_state *next, *prev;
1426 struct ctdb_client *client;
1427 struct ctdb_req_control_old *c;
1428 uint32_t reqid;
1429 struct ctdb_node *node;
1433 callback when a control reply comes in
1435 static void daemon_control_callback(struct ctdb_context *ctdb,
1436 int32_t status, TDB_DATA data,
1437 const char *errormsg,
1438 void *private_data)
1440 struct daemon_control_state *state = talloc_get_type(private_data,
1441 struct daemon_control_state);
1442 struct ctdb_client *client = state->client;
1443 struct ctdb_reply_control_old *r;
1444 size_t len;
1445 int ret;
1447 /* construct a message to send to the client containing the data */
1448 len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1449 if (errormsg) {
1450 len += strlen(errormsg);
1452 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1453 struct ctdb_reply_control_old);
1454 CTDB_NO_MEMORY_VOID(ctdb, r);
1456 r->hdr.reqid = state->reqid;
1457 r->status = status;
1458 r->datalen = data.dsize;
1459 r->errorlen = 0;
1460 memcpy(&r->data[0], data.dptr, data.dsize);
1461 if (errormsg) {
1462 r->errorlen = strlen(errormsg);
1463 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1466 ret = daemon_queue_send(client, &r->hdr);
1467 if (ret != -1) {
1468 talloc_free(state);
1473 fail all pending controls to a disconnected node
1475 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1477 struct daemon_control_state *state;
1478 while ((state = node->pending_controls)) {
1479 DLIST_REMOVE(node->pending_controls, state);
1480 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1481 "node is disconnected", state);
1486 destroy a daemon_control_state
1488 static int daemon_control_destructor(struct daemon_control_state *state)
1490 if (state->node) {
1491 DLIST_REMOVE(state->node->pending_controls, state);
1493 return 0;
1497 this is called when the ctdb daemon received a ctdb request control
1498 from a local client over the unix domain socket
1500 static void daemon_request_control_from_client(struct ctdb_client *client,
1501 struct ctdb_req_control_old *c)
1503 TDB_DATA data;
1504 int res;
1505 struct daemon_control_state *state;
1506 TALLOC_CTX *tmp_ctx = talloc_new(client);
1508 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1509 c->hdr.destnode = client->ctdb->pnn;
1512 state = talloc(client, struct daemon_control_state);
1513 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1515 state->client = client;
1516 state->c = talloc_steal(state, c);
1517 state->reqid = c->hdr.reqid;
1518 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1519 state->node = client->ctdb->nodes[c->hdr.destnode];
1520 DLIST_ADD(state->node->pending_controls, state);
1521 } else {
1522 state->node = NULL;
1525 talloc_set_destructor(state, daemon_control_destructor);
1527 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1528 talloc_steal(tmp_ctx, state);
1531 data.dptr = &c->data[0];
1532 data.dsize = c->datalen;
1533 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1534 c->srvid, c->opcode, client->client_id,
1535 c->flags,
1536 data, daemon_control_callback,
1537 state);
1538 if (res != 0) {
1539 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1540 c->hdr.destnode));
1543 talloc_free(tmp_ctx);
1547 register a call function
1549 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1550 ctdb_fn_t fn, int id)
1552 struct ctdb_registered_call *call;
1553 struct ctdb_db_context *ctdb_db;
1555 ctdb_db = find_ctdb_db(ctdb, db_id);
1556 if (ctdb_db == NULL) {
1557 return -1;
1560 call = talloc(ctdb_db, struct ctdb_registered_call);
1561 call->fn = fn;
1562 call->id = id;
1564 DLIST_ADD(ctdb_db->calls, call);
1565 return 0;
1571 this local messaging handler is ugly, but is needed to prevent
1572 recursion in ctdb_send_message() when the destination node is the
1573 same as the source node
1575 struct ctdb_local_message {
1576 struct ctdb_context *ctdb;
1577 uint64_t srvid;
1578 TDB_DATA data;
1581 static void ctdb_local_message_trigger(struct tevent_context *ev,
1582 struct tevent_timer *te,
1583 struct timeval t, void *private_data)
1585 struct ctdb_local_message *m = talloc_get_type(
1586 private_data, struct ctdb_local_message);
1588 srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1589 talloc_free(m);
1592 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1594 struct ctdb_local_message *m;
1595 m = talloc(ctdb, struct ctdb_local_message);
1596 CTDB_NO_MEMORY(ctdb, m);
1598 m->ctdb = ctdb;
1599 m->srvid = srvid;
1600 m->data = data;
1601 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1602 if (m->data.dptr == NULL) {
1603 talloc_free(m);
1604 return -1;
1607 /* this needs to be done as an event to prevent recursion */
1608 tevent_add_timer(ctdb->ev, m, timeval_zero(),
1609 ctdb_local_message_trigger, m);
1610 return 0;
1614 send a ctdb message
1616 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1617 uint64_t srvid, TDB_DATA data)
1619 struct ctdb_req_message_old *r;
1620 int len;
1622 if (ctdb->methods == NULL) {
1623 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1624 return -1;
1627 /* see if this is a message to ourselves */
1628 if (pnn == ctdb->pnn) {
1629 return ctdb_local_message(ctdb, srvid, data);
1632 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1633 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1634 struct ctdb_req_message_old);
1635 CTDB_NO_MEMORY(ctdb, r);
1637 r->hdr.destnode = pnn;
1638 r->srvid = srvid;
1639 r->datalen = data.dsize;
1640 memcpy(&r->data[0], data.dptr, data.dsize);
1642 ctdb_queue_packet(ctdb, &r->hdr);
1644 talloc_free(r);
1645 return 0;
1650 struct ctdb_client_notify_list {
1651 struct ctdb_client_notify_list *next, *prev;
1652 struct ctdb_context *ctdb;
1653 uint64_t srvid;
1654 TDB_DATA data;
1658 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1660 int ret;
1662 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1664 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1665 if (ret != 0) {
1666 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1669 return 0;
1672 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1674 struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1675 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1676 struct ctdb_client_notify_list *nl;
1678 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1680 if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1681 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1682 return -1;
1685 if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1686 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1687 return -1;
1691 if (client == NULL) {
1692 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1693 return -1;
1696 for(nl=client->notify; nl; nl=nl->next) {
1697 if (nl->srvid == notify->srvid) {
1698 break;
1701 if (nl != NULL) {
1702 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1703 return -1;
1706 nl = talloc(client, struct ctdb_client_notify_list);
1707 CTDB_NO_MEMORY(ctdb, nl);
1708 nl->ctdb = ctdb;
1709 nl->srvid = notify->srvid;
1710 nl->data.dsize = notify->len;
1711 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1712 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1713 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1715 DLIST_ADD(client->notify, nl);
1716 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1718 return 0;
1721 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1723 uint64_t srvid = *(uint64_t *)indata.dptr;
1724 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1725 struct ctdb_client_notify_list *nl;
1727 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1729 if (client == NULL) {
1730 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1731 return -1;
1734 for(nl=client->notify; nl; nl=nl->next) {
1735 if (nl->srvid == srvid) {
1736 break;
1739 if (nl == NULL) {
1740 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1741 return -1;
1744 DLIST_REMOVE(client->notify, nl);
1745 talloc_set_destructor(nl, NULL);
1746 talloc_free(nl);
1748 return 0;
1751 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1753 struct ctdb_client_pid_list *client_pid;
1755 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1756 if (client_pid->pid == pid) {
1757 return client_pid->client;
1760 return NULL;
1764 /* This control is used by samba when probing if a process (of a samba daemon)
1765 exists on the node.
1766 Samba does this when it needs/wants to check if a subrecord in one of the
1767 databases is still valied, or if it is stale and can be removed.
1768 If the node is in unhealthy or stopped state we just kill of the samba
1769 process holding htis sub-record and return to the calling samba that
1770 the process does not exist.
1771 This allows us to forcefully recall subrecords registered by samba processes
1772 on banned and stopped nodes.
1774 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1776 struct ctdb_client *client;
1778 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1779 client = ctdb_find_client_by_pid(ctdb, pid);
1780 if (client != NULL) {
1781 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1782 talloc_free(client);
1784 return -1;
1787 return kill(pid, 0);
1790 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1792 struct ctdb_node_map_old *node_map = NULL;
1794 CHECK_CONTROL_DATA_SIZE(0);
1796 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1797 if (node_map == NULL) {
1798 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1799 return -1;
1802 outdata->dptr = (unsigned char *)node_map;
1803 outdata->dsize = talloc_get_size(outdata->dptr);
1805 return 0;
1808 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1810 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1811 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1812 return;
1815 DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1816 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1817 ctdb_stop_recoverd(ctdb);
1818 ctdb_stop_keepalive(ctdb);
1819 ctdb_stop_monitoring(ctdb);
1820 ctdb_release_all_ips(ctdb);
1821 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1822 if (ctdb->methods != NULL) {
1823 ctdb->methods->shutdown(ctdb);
1826 DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1827 exit(exit_code);
1830 /* When forking the main daemon and the child process needs to connect
1831 * back to the daemon as a client process, this function can be used
1832 * to change the ctdb context from daemon into client mode. The child
1833 * process must be created using ctdb_fork() and not fork() -
1834 * ctdb_fork() does some necessary housekeeping.
1836 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
1838 int ret;
1839 va_list ap;
1841 /* Add extra information so we can identify this in the logs */
1842 va_start(ap, fmt);
1843 debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
1844 va_end(ap);
1846 /* get a new event context */
1847 ctdb->ev = tevent_context_init(ctdb);
1848 tevent_loop_allow_nesting(ctdb->ev);
1850 /* Connect to main CTDB daemon */
1851 ret = ctdb_socket_connect(ctdb);
1852 if (ret != 0) {
1853 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1854 return -1;
1857 ctdb->can_send_controls = true;
1859 return 0;