ctdb-common: Add config file parsing code
[Samba.git] / ctdb / server / ctdb_daemon.c
blob3b06972d030e6698a3f7972d3f9db7de3db536e2
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
39 #include "common/version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
51 struct ctdb_client_pid_list {
52 struct ctdb_client_pid_list *next, *prev;
53 struct ctdb_context *ctdb;
54 pid_t pid;
55 struct ctdb_client *client;
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
63 static pid_t __ctdbd_pid;
65 static void print_exit_message(void)
67 if (getpid() == __ctdbd_pid) {
68 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
70 /* Wait a second to allow pending log messages to be flushed */
71 sleep(1);
77 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
78 struct timeval t, void *private_data)
80 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
82 if (getpid() != ctdb->ctdbd_pid) {
83 return;
86 tevent_add_timer(ctdb->ev, ctdb,
87 timeval_current_ofs(1, 0),
88 ctdb_time_tick, ctdb);
91 /* Used to trigger a dummy event once per second, to make
92 * detection of hangs more reliable.
94 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
96 tevent_add_timer(ctdb->ev, ctdb,
97 timeval_current_ofs(1, 0),
98 ctdb_time_tick, ctdb);
101 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
103 /* start monitoring for connected/disconnected nodes */
104 ctdb_start_keepalive(ctdb);
106 /* start periodic update of tcp tickle lists */
107 ctdb_start_tcp_tickle_update(ctdb);
109 /* start listening for recovery daemon pings */
110 ctdb_control_recd_ping(ctdb);
112 /* start listening to timer ticks */
113 ctdb_start_time_tickd(ctdb);
116 static void ignore_signal(int signum)
118 struct sigaction act;
120 memset(&act, 0, sizeof(act));
122 act.sa_handler = SIG_IGN;
123 sigemptyset(&act.sa_mask);
124 sigaddset(&act.sa_mask, signum);
125 sigaction(signum, &act, NULL);
130 send a packet to a client
132 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
134 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
135 if (hdr->operation == CTDB_REQ_MESSAGE) {
136 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
137 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138 talloc_free(client);
139 return -1;
142 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
146 message handler for when we are in daemon mode. This redirects the message
147 to the right client
149 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
150 void *private_data)
152 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
153 struct ctdb_req_message_old *r;
154 int len;
156 /* construct a message to send to the client containing the data */
157 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
158 r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
159 len, struct ctdb_req_message_old);
160 CTDB_NO_MEMORY_VOID(client->ctdb, r);
162 talloc_set_name_const(r, "req_message packet");
164 r->srvid = srvid;
165 r->datalen = data.dsize;
166 memcpy(&r->data[0], data.dptr, data.dsize);
168 daemon_queue_send(client, &r->hdr);
170 talloc_free(r);
174 this is called when the ctdb daemon received a ctdb request to
175 set the srvid from the client
177 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
179 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
180 int res;
181 if (client == NULL) {
182 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
183 return -1;
185 res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
186 client);
187 if (res != 0) {
188 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
189 (unsigned long long)srvid));
190 } else {
191 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
192 (unsigned long long)srvid));
195 return res;
199 this is called when the ctdb daemon received a ctdb request to
200 remove a srvid from the client
202 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
204 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
205 if (client == NULL) {
206 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
207 return -1;
209 return srvid_deregister(ctdb->srv, srvid, client);
212 void daemon_tunnel_handler(uint64_t tunnel_id, TDB_DATA data,
213 void *private_data)
215 struct ctdb_client *client =
216 talloc_get_type_abort(private_data, struct ctdb_client);
217 struct ctdb_req_tunnel_old *c, *pkt;
218 size_t len;
220 pkt = (struct ctdb_req_tunnel_old *)data.dptr;
222 len = offsetof(struct ctdb_req_tunnel_old, data) + pkt->datalen;
223 c = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_TUNNEL,
224 len, struct ctdb_req_tunnel_old);
225 if (c == NULL) {
226 DEBUG(DEBUG_ERR, ("Memory error in daemon_tunnel_handler\n"));
227 return;
230 talloc_set_name_const(c, "req_tunnel packet");
232 c->tunnel_id = tunnel_id;
233 c->flags = pkt->flags;
234 c->datalen = pkt->datalen;
235 memcpy(c->data, pkt->data, pkt->datalen);
237 daemon_queue_send(client, &c->hdr);
239 talloc_free(c);
243 destroy a ctdb_client
245 static int ctdb_client_destructor(struct ctdb_client *client)
247 struct ctdb_db_context *ctdb_db;
249 ctdb_takeover_client_destructor_hook(client);
250 reqid_remove(client->ctdb->idr, client->client_id);
251 client->ctdb->num_clients--;
253 if (client->num_persistent_updates != 0) {
254 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
255 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
257 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
258 if (ctdb_db) {
259 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
260 "commit active. Forcing recovery.\n"));
261 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
264 * trans3 transaction state:
266 * The destructor sets the pointer to NULL.
268 talloc_free(ctdb_db->persistent_state);
271 return 0;
276 this is called when the ctdb daemon received a ctdb request message
277 from a local client over the unix domain socket
279 static void daemon_request_message_from_client(struct ctdb_client *client,
280 struct ctdb_req_message_old *c)
282 TDB_DATA data;
283 int res;
285 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
286 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
289 /* maybe the message is for another client on this node */
290 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
291 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
292 return;
295 /* its for a remote node */
296 data.dptr = &c->data[0];
297 data.dsize = c->datalen;
298 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
299 c->srvid, data);
300 if (res != 0) {
301 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
302 c->hdr.destnode));
307 struct daemon_call_state {
308 struct ctdb_client *client;
309 uint32_t reqid;
310 struct ctdb_call *call;
311 struct timeval start_time;
313 /* readonly request ? */
314 uint32_t readonly_fetch;
315 uint32_t client_callid;
319 complete a call from a client
321 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
323 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
324 struct daemon_call_state);
325 struct ctdb_reply_call_old *r;
326 int res;
327 uint32_t length;
328 struct ctdb_client *client = dstate->client;
329 struct ctdb_db_context *ctdb_db = state->ctdb_db;
331 talloc_steal(client, dstate);
332 talloc_steal(dstate, dstate->call);
334 res = ctdb_daemon_call_recv(state, dstate->call);
335 if (res != 0) {
336 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
337 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
339 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
340 return;
343 length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
344 /* If the client asked for readonly FETCH, we remapped this to
345 FETCH_WITH_HEADER when calling the daemon. So we must
346 strip the extra header off the reply data before passing
347 it back to the client.
349 if (dstate->readonly_fetch
350 && dstate->client_callid == CTDB_FETCH_FUNC) {
351 length -= sizeof(struct ctdb_ltdb_header);
354 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
355 length, struct ctdb_reply_call_old);
356 if (r == NULL) {
357 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
358 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
359 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
360 return;
362 r->hdr.reqid = dstate->reqid;
363 r->status = dstate->call->status;
365 if (dstate->readonly_fetch
366 && dstate->client_callid == CTDB_FETCH_FUNC) {
367 /* client only asked for a FETCH so we must strip off
368 the extra ctdb_ltdb header
370 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
371 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
372 } else {
373 r->datalen = dstate->call->reply_data.dsize;
374 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
377 res = daemon_queue_send(client, &r->hdr);
378 if (res == -1) {
379 /* client is dead - return immediately */
380 return;
382 if (res != 0) {
383 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
385 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
386 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
387 talloc_free(dstate);
390 struct ctdb_daemon_packet_wrap {
391 struct ctdb_context *ctdb;
392 uint32_t client_id;
396 a wrapper to catch disconnected clients
398 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
400 struct ctdb_client *client;
401 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
402 struct ctdb_daemon_packet_wrap);
403 if (w == NULL) {
404 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
405 return;
408 client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
409 if (client == NULL) {
410 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
411 w->client_id));
412 talloc_free(w);
413 return;
415 talloc_free(w);
417 /* process it */
418 daemon_incoming_packet(client, hdr);
421 struct ctdb_deferred_fetch_call {
422 struct ctdb_deferred_fetch_call *next, *prev;
423 struct ctdb_req_call_old *c;
424 struct ctdb_daemon_packet_wrap *w;
427 struct ctdb_deferred_fetch_queue {
428 struct ctdb_deferred_fetch_call *deferred_calls;
431 struct ctdb_deferred_requeue {
432 struct ctdb_deferred_fetch_call *dfc;
433 struct ctdb_client *client;
436 /* called from a timer event and starts reprocessing the deferred call.*/
437 static void reprocess_deferred_call(struct tevent_context *ev,
438 struct tevent_timer *te,
439 struct timeval t, void *private_data)
441 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
442 struct ctdb_client *client = dfr->client;
444 talloc_steal(client, dfr->dfc->c);
445 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
446 talloc_free(dfr);
449 /* the referral context is destroyed either after a timeout or when the initial
450 fetch-lock has finished.
451 at this stage, immediately start reprocessing the queued up deferred
452 calls so they get reprocessed immediately (and since we are dmaster at
453 this stage, trigger the waiting smbd processes to pick up and aquire the
454 record right away.
456 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
459 /* need to reprocess the packets from the queue explicitely instead of
460 just using a normal destructor since we want, need, to
461 call the clients in the same oder as the requests queued up
463 while (dfq->deferred_calls != NULL) {
464 struct ctdb_client *client;
465 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
466 struct ctdb_deferred_requeue *dfr;
468 DLIST_REMOVE(dfq->deferred_calls, dfc);
470 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
471 if (client == NULL) {
472 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
473 dfc->w->client_id));
474 continue;
477 /* process it by pushing it back onto the eventloop */
478 dfr = talloc(client, struct ctdb_deferred_requeue);
479 if (dfr == NULL) {
480 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
481 continue;
484 dfr->dfc = talloc_steal(dfr, dfc);
485 dfr->client = client;
487 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
488 reprocess_deferred_call, dfr);
491 return 0;
494 /* insert the new deferral context into the rb tree.
495 there should never be a pre-existing context here, but check for it
496 warn and destroy the previous context if there is already a deferral context
497 for this key.
499 static void *insert_dfq_callback(void *parm, void *data)
501 if (data) {
502 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
503 talloc_free(data);
505 return parm;
508 /* if the original fetch-lock did not complete within a reasonable time,
509 free the context and context for all deferred requests to cause them to be
510 re-inserted into the event system.
512 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
513 struct timeval t, void *private_data)
515 talloc_free(private_data);
518 /* This function is used in the local daemon to register a KEY in a database
519 for being "fetched"
520 While the remote fetch is in-flight, any futher attempts to re-fetch the
521 same record will be deferred until the fetch completes.
523 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
525 uint32_t *k;
526 struct ctdb_deferred_fetch_queue *dfq;
528 k = ctdb_key_to_idkey(call, call->key);
529 if (k == NULL) {
530 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
531 return -1;
534 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
535 if (dfq == NULL) {
536 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
537 talloc_free(k);
538 return -1;
540 dfq->deferred_calls = NULL;
542 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
544 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
546 /* if the fetch havent completed in 30 seconds, just tear it all down
547 and let it try again as the events are reissued */
548 tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
549 dfq_timeout, dfq);
551 talloc_free(k);
552 return 0;
555 /* check if this is a duplicate request to a fetch already in-flight
556 if it is, make this call deferred to be reprocessed later when
557 the in-flight fetch completes.
559 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
561 uint32_t *k;
562 struct ctdb_deferred_fetch_queue *dfq;
563 struct ctdb_deferred_fetch_call *dfc;
565 k = ctdb_key_to_idkey(c, key);
566 if (k == NULL) {
567 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
568 return -1;
571 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
572 if (dfq == NULL) {
573 talloc_free(k);
574 return -1;
578 talloc_free(k);
580 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
581 if (dfc == NULL) {
582 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
583 return -1;
586 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
587 if (dfc->w == NULL) {
588 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
589 talloc_free(dfc);
590 return -1;
593 dfc->c = talloc_steal(dfc, c);
594 dfc->w->ctdb = ctdb_db->ctdb;
595 dfc->w->client_id = client->client_id;
597 DLIST_ADD_END(dfq->deferred_calls, dfc);
599 return 0;
604 this is called when the ctdb daemon received a ctdb request call
605 from a local client over the unix domain socket
607 static void daemon_request_call_from_client(struct ctdb_client *client,
608 struct ctdb_req_call_old *c)
610 struct ctdb_call_state *state;
611 struct ctdb_db_context *ctdb_db;
612 struct daemon_call_state *dstate;
613 struct ctdb_call *call;
614 struct ctdb_ltdb_header header;
615 TDB_DATA key, data;
616 int ret;
617 struct ctdb_context *ctdb = client->ctdb;
618 struct ctdb_daemon_packet_wrap *w;
620 CTDB_INCREMENT_STAT(ctdb, total_calls);
621 CTDB_INCREMENT_STAT(ctdb, pending_calls);
623 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
624 if (!ctdb_db) {
625 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
626 c->db_id));
627 CTDB_DECREMENT_STAT(ctdb, pending_calls);
628 return;
631 if (ctdb_db->unhealthy_reason) {
633 * this is just a warning, as the tdb should be empty anyway,
634 * and only persistent databases can be unhealthy, which doesn't
635 * use this code patch
637 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
638 ctdb_db->db_name, ctdb_db->unhealthy_reason));
641 key.dptr = c->data;
642 key.dsize = c->keylen;
644 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
645 CTDB_NO_MEMORY_VOID(ctdb, w);
647 w->ctdb = ctdb;
648 w->client_id = client->client_id;
650 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
651 (struct ctdb_req_header *)c, &data,
652 daemon_incoming_packet_wrap, w, true);
653 if (ret == -2) {
654 /* will retry later */
655 CTDB_DECREMENT_STAT(ctdb, pending_calls);
656 return;
659 talloc_free(w);
661 if (ret != 0) {
662 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
663 CTDB_DECREMENT_STAT(ctdb, pending_calls);
664 return;
668 /* check if this fetch request is a duplicate for a
669 request we already have in flight. If so defer it until
670 the first request completes.
672 if (ctdb->tunable.fetch_collapse == 1) {
673 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
674 ret = ctdb_ltdb_unlock(ctdb_db, key);
675 if (ret != 0) {
676 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
678 CTDB_DECREMENT_STAT(ctdb, pending_calls);
679 talloc_free(data.dptr);
680 return;
684 /* Dont do READONLY if we don't have a tracking database */
685 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
686 c->flags &= ~CTDB_WANT_READONLY;
689 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
690 header.flags &= ~CTDB_REC_RO_FLAGS;
691 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
692 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
693 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
694 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
696 /* and clear out the tracking data */
697 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
698 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
702 /* if we are revoking, we must defer all other calls until the revoke
703 * had completed.
705 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
706 talloc_free(data.dptr);
707 ret = ctdb_ltdb_unlock(ctdb_db, key);
709 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
710 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
712 CTDB_DECREMENT_STAT(ctdb, pending_calls);
713 return;
716 if ((header.dmaster == ctdb->pnn)
717 && (!(c->flags & CTDB_WANT_READONLY))
718 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
719 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
720 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
721 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
723 ret = ctdb_ltdb_unlock(ctdb_db, key);
725 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
726 ctdb_fatal(ctdb, "Failed to start record revoke");
728 talloc_free(data.dptr);
730 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
731 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
734 CTDB_DECREMENT_STAT(ctdb, pending_calls);
735 return;
738 dstate = talloc(client, struct daemon_call_state);
739 if (dstate == NULL) {
740 ret = ctdb_ltdb_unlock(ctdb_db, key);
741 if (ret != 0) {
742 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
745 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
746 CTDB_DECREMENT_STAT(ctdb, pending_calls);
747 return;
749 dstate->start_time = timeval_current();
750 dstate->client = client;
751 dstate->reqid = c->hdr.reqid;
752 talloc_steal(dstate, data.dptr);
754 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
755 if (call == NULL) {
756 ret = ctdb_ltdb_unlock(ctdb_db, key);
757 if (ret != 0) {
758 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
761 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
762 CTDB_DECREMENT_STAT(ctdb, pending_calls);
763 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
764 return;
767 dstate->readonly_fetch = 0;
768 call->call_id = c->callid;
769 call->key = key;
770 call->call_data.dptr = c->data + c->keylen;
771 call->call_data.dsize = c->calldatalen;
772 call->flags = c->flags;
774 if (c->flags & CTDB_WANT_READONLY) {
775 /* client wants readonly record, so translate this into a
776 fetch with header. remember what the client asked for
777 so we can remap the reply back to the proper format for
778 the client in the reply
780 dstate->client_callid = call->call_id;
781 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
782 dstate->readonly_fetch = 1;
785 if (header.dmaster == ctdb->pnn) {
786 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
787 } else {
788 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
789 if (ctdb->tunable.fetch_collapse == 1) {
790 /* This request triggered a remote fetch-lock.
791 set up a deferral for this key so any additional
792 fetch-locks are deferred until the current one
793 finishes.
795 setup_deferred_fetch_locks(ctdb_db, call);
799 ret = ctdb_ltdb_unlock(ctdb_db, key);
800 if (ret != 0) {
801 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
804 if (state == NULL) {
805 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
806 CTDB_DECREMENT_STAT(ctdb, pending_calls);
807 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
808 return;
810 talloc_steal(state, dstate);
811 talloc_steal(client, state);
813 state->async.fn = daemon_call_from_client_callback;
814 state->async.private_data = dstate;
818 static void daemon_request_control_from_client(struct ctdb_client *client,
819 struct ctdb_req_control_old *c);
820 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
821 struct ctdb_req_tunnel_old *c);
823 /* data contains a packet from the client */
824 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
826 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
827 TALLOC_CTX *tmp_ctx;
828 struct ctdb_context *ctdb = client->ctdb;
830 /* place the packet as a child of a tmp_ctx. We then use
831 talloc_free() below to free it. If any of the calls want
832 to keep it, then they will steal it somewhere else, and the
833 talloc_free() will be a no-op */
834 tmp_ctx = talloc_new(client);
835 talloc_steal(tmp_ctx, hdr);
837 if (hdr->ctdb_magic != CTDB_MAGIC) {
838 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
839 goto done;
842 if (hdr->ctdb_version != CTDB_PROTOCOL) {
843 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
844 goto done;
847 switch (hdr->operation) {
848 case CTDB_REQ_CALL:
849 CTDB_INCREMENT_STAT(ctdb, client.req_call);
850 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
851 break;
853 case CTDB_REQ_MESSAGE:
854 CTDB_INCREMENT_STAT(ctdb, client.req_message);
855 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
856 break;
858 case CTDB_REQ_CONTROL:
859 CTDB_INCREMENT_STAT(ctdb, client.req_control);
860 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
861 break;
863 case CTDB_REQ_TUNNEL:
864 CTDB_INCREMENT_STAT(ctdb, client.req_tunnel);
865 daemon_request_tunnel_from_client(client, (struct ctdb_req_tunnel_old *)hdr);
866 break;
868 default:
869 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
870 hdr->operation));
873 done:
874 talloc_free(tmp_ctx);
878 called when the daemon gets a incoming packet
880 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
882 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
883 struct ctdb_req_header *hdr;
885 if (cnt == 0) {
886 talloc_free(client);
887 return;
890 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
892 if (cnt < sizeof(*hdr)) {
893 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
894 (unsigned)cnt);
895 return;
897 hdr = (struct ctdb_req_header *)data;
899 if (hdr->ctdb_magic != CTDB_MAGIC) {
900 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
901 goto err_out;
904 if (hdr->ctdb_version != CTDB_PROTOCOL) {
905 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
906 goto err_out;
909 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
910 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
911 hdr->srcnode, hdr->destnode));
913 /* it is the responsibility of the incoming packet function to free 'data' */
914 daemon_incoming_packet(client, hdr);
915 return;
917 err_out:
918 TALLOC_FREE(data);
922 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
924 if (client_pid->ctdb->client_pids != NULL) {
925 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
928 return 0;
932 static void ctdb_accept_client(struct tevent_context *ev,
933 struct tevent_fd *fde, uint16_t flags,
934 void *private_data)
936 struct sockaddr_un addr;
937 socklen_t len;
938 int fd;
939 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
940 struct ctdb_client *client;
941 struct ctdb_client_pid_list *client_pid;
942 pid_t peer_pid = 0;
943 int ret;
945 memset(&addr, 0, sizeof(addr));
946 len = sizeof(addr);
947 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
948 if (fd == -1) {
949 return;
951 smb_set_close_on_exec(fd);
953 ret = set_blocking(fd, false);
954 if (ret != 0) {
955 DEBUG(DEBUG_ERR,
956 (__location__
957 " failed to set socket non-blocking (%s)\n",
958 strerror(errno)));
959 close(fd);
960 return;
963 set_close_on_exec(fd);
965 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
967 client = talloc_zero(ctdb, struct ctdb_client);
968 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
969 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
972 client->ctdb = ctdb;
973 client->fd = fd;
974 client->client_id = reqid_new(ctdb->idr, client);
975 client->pid = peer_pid;
977 client_pid = talloc(client, struct ctdb_client_pid_list);
978 if (client_pid == NULL) {
979 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
980 close(fd);
981 talloc_free(client);
982 return;
984 client_pid->ctdb = ctdb;
985 client_pid->pid = peer_pid;
986 client_pid->client = client;
988 DLIST_ADD(ctdb->client_pids, client_pid);
990 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
991 ctdb_daemon_read_cb, client,
992 "client-%u", client->pid);
994 talloc_set_destructor(client, ctdb_client_destructor);
995 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
996 ctdb->num_clients++;
1002 create a unix domain socket and bind it
1003 return a file descriptor open on the socket
1005 static int ux_socket_bind(struct ctdb_context *ctdb)
1007 struct sockaddr_un addr;
1008 int ret;
1010 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1011 if (ctdb->daemon.sd == -1) {
1012 return -1;
1015 memset(&addr, 0, sizeof(addr));
1016 addr.sun_family = AF_UNIX;
1017 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1019 if (! sock_clean(ctdb->daemon.name)) {
1020 return -1;
1023 set_close_on_exec(ctdb->daemon.sd);
1025 ret = set_blocking(ctdb->daemon.sd, false);
1026 if (ret != 0) {
1027 DEBUG(DEBUG_ERR,
1028 (__location__
1029 " failed to set socket non-blocking (%s)\n",
1030 strerror(errno)));
1031 goto failed;
1034 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1035 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1036 goto failed;
1039 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1040 chmod(ctdb->daemon.name, 0700) != 0) {
1041 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1042 goto failed;
1046 if (listen(ctdb->daemon.sd, 100) != 0) {
1047 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1048 goto failed;
1051 DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1052 ctdb->daemon.name));
1053 return 0;
1055 failed:
1056 close(ctdb->daemon.sd);
1057 ctdb->daemon.sd = -1;
1058 return -1;
1061 static void initialise_node_flags (struct ctdb_context *ctdb)
1063 if (ctdb->pnn == -1) {
1064 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1067 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1069 /* do we start out in DISABLED mode? */
1070 if (ctdb->start_as_disabled != 0) {
1071 DEBUG(DEBUG_ERR,
1072 ("This node is configured to start in DISABLED state\n"));
1073 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1075 /* do we start out in STOPPED mode? */
1076 if (ctdb->start_as_stopped != 0) {
1077 DEBUG(DEBUG_ERR,
1078 ("This node is configured to start in STOPPED state\n"));
1079 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1083 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1084 void *private_data)
1086 if (status != 0) {
1087 ctdb_die(ctdb, "Failed to run setup event");
1089 ctdb_run_notification_script(ctdb, "setup");
1091 /* Start the recovery daemon */
1092 if (ctdb_start_recoverd(ctdb) != 0) {
1093 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1094 exit(11);
1097 ctdb_start_periodic_events(ctdb);
1099 ctdb_wait_for_first_recovery(ctdb);
1102 static struct timeval tevent_before_wait_ts;
1103 static struct timeval tevent_after_wait_ts;
1105 static void ctdb_tevent_trace_init(void)
1107 struct timeval now;
1109 now = timeval_current();
1111 tevent_before_wait_ts = now;
1112 tevent_after_wait_ts = now;
1115 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1116 void *private_data)
1118 struct timeval diff;
1119 struct timeval now;
1120 struct ctdb_context *ctdb =
1121 talloc_get_type(private_data, struct ctdb_context);
1123 if (getpid() != ctdb->ctdbd_pid) {
1124 return;
1127 now = timeval_current();
1129 switch (tp) {
1130 case TEVENT_TRACE_BEFORE_WAIT:
1131 diff = timeval_until(&tevent_after_wait_ts, &now);
1132 if (diff.tv_sec > 3) {
1133 DEBUG(DEBUG_ERR,
1134 ("Handling event took %ld seconds!\n",
1135 (long)diff.tv_sec));
1137 tevent_before_wait_ts = now;
1138 break;
1140 case TEVENT_TRACE_AFTER_WAIT:
1141 diff = timeval_until(&tevent_before_wait_ts, &now);
1142 if (diff.tv_sec > 3) {
1143 DEBUG(DEBUG_ERR,
1144 ("No event for %ld seconds!\n",
1145 (long)diff.tv_sec));
1147 tevent_after_wait_ts = now;
1148 break;
1150 default:
1151 /* Do nothing for future tevent trace points */ ;
1155 static void ctdb_remove_pidfile(void)
1157 TALLOC_FREE(ctdbd_pidfile_ctx);
1160 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1162 if (ctdbd_pidfile != NULL) {
1163 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1164 &ctdbd_pidfile_ctx);
1165 if (ret != 0) {
1166 DEBUG(DEBUG_ERR,
1167 ("Failed to create PID file %s\n",
1168 ctdbd_pidfile));
1169 exit(11);
1172 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1173 atexit(ctdb_remove_pidfile);
1177 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1179 int i, j, count;
1181 /* initialize the vnn mapping table, skipping any deleted nodes */
1182 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1183 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1185 count = 0;
1186 for (i = 0; i < ctdb->num_nodes; i++) {
1187 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1188 count++;
1192 ctdb->vnn_map->generation = INVALID_GENERATION;
1193 ctdb->vnn_map->size = count;
1194 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1195 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1197 for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1198 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1199 continue;
1201 ctdb->vnn_map->map[j] = i;
1202 j++;
1206 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1208 int nodeid;
1210 if (ctdb->address == NULL) {
1211 ctdb_fatal(ctdb,
1212 "Can not determine PNN - node address is not set\n");
1215 nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1216 if (nodeid == -1) {
1217 ctdb_fatal(ctdb,
1218 "Can not determine PNN - node address not found in node list\n");
1221 ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1222 DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1226 start the protocol going as a daemon
1228 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1230 int res, ret = -1;
1231 struct tevent_fd *fde;
1233 become_daemon(do_fork, false, false);
1235 ignore_signal(SIGPIPE);
1236 ignore_signal(SIGUSR1);
1238 ctdb->ctdbd_pid = getpid();
1239 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1240 ctdb_version_string, ctdb->ctdbd_pid));
1241 ctdb_create_pidfile(ctdb);
1243 /* create a unix domain stream socket to listen to */
1244 res = ux_socket_bind(ctdb);
1245 if (res!=0) {
1246 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1247 exit(10);
1250 /* Make sure we log something when the daemon terminates.
1251 * This must be the first exit handler to run (so the last to
1252 * be registered.
1254 __ctdbd_pid = getpid();
1255 atexit(print_exit_message);
1257 if (ctdb->do_setsched) {
1258 /* try to set us up as realtime */
1259 if (!set_scheduler()) {
1260 exit(1);
1262 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1265 ctdb->ev = tevent_context_init(NULL);
1266 if (ctdb->ev == NULL) {
1267 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1268 exit(1);
1270 tevent_loop_allow_nesting(ctdb->ev);
1271 ctdb_tevent_trace_init();
1272 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1274 /* set up a handler to pick up sigchld */
1275 if (ctdb_init_sigchld(ctdb) == NULL) {
1276 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1277 exit(1);
1280 if (do_fork) {
1281 ctdb_set_child_logging(ctdb);
1284 TALLOC_FREE(ctdb->srv);
1285 if (srvid_init(ctdb, &ctdb->srv) != 0) {
1286 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1287 exit(1);
1290 TALLOC_FREE(ctdb->tunnels);
1291 if (srvid_init(ctdb, &ctdb->tunnels) != 0) {
1292 DEBUG(DEBUG_ERR, ("Failed to setup tunnels context\n"));
1293 exit(1);
1296 /* initialize statistics collection */
1297 ctdb_statistics_init(ctdb);
1299 /* force initial recovery for election */
1300 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1302 if (ctdb_start_eventd(ctdb) != 0) {
1303 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1304 exit(1);
1307 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1308 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1309 if (ret != 0) {
1310 ctdb_die(ctdb, "Failed to run init event\n");
1312 ctdb_run_notification_script(ctdb, "init");
1314 if (strcmp(ctdb->transport, "tcp") == 0) {
1315 ret = ctdb_tcp_init(ctdb);
1317 #ifdef USE_INFINIBAND
1318 if (strcmp(ctdb->transport, "ib") == 0) {
1319 ret = ctdb_ibw_init(ctdb);
1321 #endif
1322 if (ret != 0) {
1323 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1324 return -1;
1327 if (ctdb->methods == NULL) {
1328 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1329 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1332 /* Initialise the transport. This sets the node address if it
1333 * was not set via the command-line. */
1334 if (ctdb->methods->initialise(ctdb) != 0) {
1335 ctdb_fatal(ctdb, "transport failed to initialise");
1338 ctdb_set_my_pnn(ctdb);
1340 initialise_node_flags(ctdb);
1342 ret = ctdb_set_public_addresses(ctdb, true);
1343 if (ret == -1) {
1344 D_ERR("Unable to setup public IP addresses\n");
1345 exit(1);
1348 ctdb_initialise_vnn_map(ctdb);
1350 /* attach to existing databases */
1351 if (ctdb_attach_databases(ctdb) != 0) {
1352 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1355 /* start frozen, then let the first election sort things out */
1356 if (!ctdb_blocking_freeze(ctdb)) {
1357 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1360 /* now start accepting clients, only can do this once frozen */
1361 fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1362 ctdb_accept_client, ctdb);
1363 if (fde == NULL) {
1364 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1366 tevent_fd_set_auto_close(fde);
1368 /* Start the transport */
1369 if (ctdb->methods->start(ctdb) != 0) {
1370 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1371 ctdb_fatal(ctdb, "transport failed to start");
1374 /* Recovery daemon and timed events are started from the
1375 * callback, only after the setup event completes
1376 * successfully.
1378 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1379 ret = ctdb_event_script_callback(ctdb,
1380 ctdb,
1381 ctdb_setup_event_callback,
1382 ctdb,
1383 CTDB_EVENT_SETUP,
1384 "%s",
1385 "");
1386 if (ret != 0) {
1387 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1388 exit(1);
1391 lockdown_memory(ctdb->valgrinding);
1393 /* go into a wait loop to allow other nodes to complete */
1394 tevent_loop_wait(ctdb->ev);
1396 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1397 exit(1);
1401 allocate a packet for use in daemon<->daemon communication
1403 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1404 TALLOC_CTX *mem_ctx,
1405 enum ctdb_operation operation,
1406 size_t length, size_t slength,
1407 const char *type)
1409 int size;
1410 struct ctdb_req_header *hdr;
1412 length = MAX(length, slength);
1413 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1415 if (ctdb->methods == NULL) {
1416 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1417 operation, (unsigned)length));
1418 return NULL;
1421 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1422 if (hdr == NULL) {
1423 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1424 operation, (unsigned)length));
1425 return NULL;
1427 talloc_set_name_const(hdr, type);
1428 memset(hdr, 0, slength);
1429 hdr->length = length;
1430 hdr->operation = operation;
1431 hdr->ctdb_magic = CTDB_MAGIC;
1432 hdr->ctdb_version = CTDB_PROTOCOL;
1433 hdr->generation = ctdb->vnn_map->generation;
1434 hdr->srcnode = ctdb->pnn;
1436 return hdr;
1439 struct daemon_control_state {
1440 struct daemon_control_state *next, *prev;
1441 struct ctdb_client *client;
1442 struct ctdb_req_control_old *c;
1443 uint32_t reqid;
1444 struct ctdb_node *node;
1448 callback when a control reply comes in
1450 static void daemon_control_callback(struct ctdb_context *ctdb,
1451 int32_t status, TDB_DATA data,
1452 const char *errormsg,
1453 void *private_data)
1455 struct daemon_control_state *state = talloc_get_type(private_data,
1456 struct daemon_control_state);
1457 struct ctdb_client *client = state->client;
1458 struct ctdb_reply_control_old *r;
1459 size_t len;
1460 int ret;
1462 /* construct a message to send to the client containing the data */
1463 len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1464 if (errormsg) {
1465 len += strlen(errormsg);
1467 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1468 struct ctdb_reply_control_old);
1469 CTDB_NO_MEMORY_VOID(ctdb, r);
1471 r->hdr.reqid = state->reqid;
1472 r->status = status;
1473 r->datalen = data.dsize;
1474 r->errorlen = 0;
1475 memcpy(&r->data[0], data.dptr, data.dsize);
1476 if (errormsg) {
1477 r->errorlen = strlen(errormsg);
1478 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1481 ret = daemon_queue_send(client, &r->hdr);
1482 if (ret != -1) {
1483 talloc_free(state);
1488 fail all pending controls to a disconnected node
1490 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1492 struct daemon_control_state *state;
1493 while ((state = node->pending_controls)) {
1494 DLIST_REMOVE(node->pending_controls, state);
1495 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1496 "node is disconnected", state);
1501 destroy a daemon_control_state
1503 static int daemon_control_destructor(struct daemon_control_state *state)
1505 if (state->node) {
1506 DLIST_REMOVE(state->node->pending_controls, state);
1508 return 0;
1512 this is called when the ctdb daemon received a ctdb request control
1513 from a local client over the unix domain socket
1515 static void daemon_request_control_from_client(struct ctdb_client *client,
1516 struct ctdb_req_control_old *c)
1518 TDB_DATA data;
1519 int res;
1520 struct daemon_control_state *state;
1521 TALLOC_CTX *tmp_ctx = talloc_new(client);
1523 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1524 c->hdr.destnode = client->ctdb->pnn;
1527 state = talloc(client, struct daemon_control_state);
1528 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1530 state->client = client;
1531 state->c = talloc_steal(state, c);
1532 state->reqid = c->hdr.reqid;
1533 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1534 state->node = client->ctdb->nodes[c->hdr.destnode];
1535 DLIST_ADD(state->node->pending_controls, state);
1536 } else {
1537 state->node = NULL;
1540 talloc_set_destructor(state, daemon_control_destructor);
1542 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1543 talloc_steal(tmp_ctx, state);
1546 data.dptr = &c->data[0];
1547 data.dsize = c->datalen;
1548 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1549 c->srvid, c->opcode, client->client_id,
1550 c->flags,
1551 data, daemon_control_callback,
1552 state);
1553 if (res != 0) {
1554 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1555 c->hdr.destnode));
1558 talloc_free(tmp_ctx);
1561 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
1562 struct ctdb_req_tunnel_old *c)
1564 TDB_DATA data;
1565 int ret;
1567 if (! ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1568 DEBUG(DEBUG_ERR, ("Invalid destination 0x%x\n",
1569 c->hdr.destnode));
1570 return;
1573 ret = srvid_exists(client->ctdb->tunnels, c->tunnel_id, NULL);
1574 if (ret != 0) {
1575 DEBUG(DEBUG_ERR,
1576 ("tunnel id 0x%"PRIx64" not registered, dropping pkt\n",
1577 c->tunnel_id));
1578 return;
1581 data = (TDB_DATA) {
1582 .dsize = c->datalen,
1583 .dptr = &c->data[0],
1586 ret = ctdb_daemon_send_tunnel(client->ctdb, c->hdr.destnode,
1587 c->tunnel_id, c->flags, data);
1588 if (ret != 0) {
1589 DEBUG(DEBUG_ERR, ("Failed to set tunnel to remote note %u\n",
1590 c->hdr.destnode));
1595 register a call function
1597 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1598 ctdb_fn_t fn, int id)
1600 struct ctdb_registered_call *call;
1601 struct ctdb_db_context *ctdb_db;
1603 ctdb_db = find_ctdb_db(ctdb, db_id);
1604 if (ctdb_db == NULL) {
1605 return -1;
1608 call = talloc(ctdb_db, struct ctdb_registered_call);
1609 call->fn = fn;
1610 call->id = id;
1612 DLIST_ADD(ctdb_db->calls, call);
1613 return 0;
1619 this local messaging handler is ugly, but is needed to prevent
1620 recursion in ctdb_send_message() when the destination node is the
1621 same as the source node
1623 struct ctdb_local_message {
1624 struct ctdb_context *ctdb;
1625 uint64_t srvid;
1626 TDB_DATA data;
1629 static void ctdb_local_message_trigger(struct tevent_context *ev,
1630 struct tevent_timer *te,
1631 struct timeval t, void *private_data)
1633 struct ctdb_local_message *m = talloc_get_type(
1634 private_data, struct ctdb_local_message);
1636 srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1637 talloc_free(m);
1640 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1642 struct ctdb_local_message *m;
1643 m = talloc(ctdb, struct ctdb_local_message);
1644 CTDB_NO_MEMORY(ctdb, m);
1646 m->ctdb = ctdb;
1647 m->srvid = srvid;
1648 m->data = data;
1649 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1650 if (m->data.dptr == NULL) {
1651 talloc_free(m);
1652 return -1;
1655 /* this needs to be done as an event to prevent recursion */
1656 tevent_add_timer(ctdb->ev, m, timeval_zero(),
1657 ctdb_local_message_trigger, m);
1658 return 0;
1662 send a ctdb message
1664 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1665 uint64_t srvid, TDB_DATA data)
1667 struct ctdb_req_message_old *r;
1668 int len;
1670 if (ctdb->methods == NULL) {
1671 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1672 return -1;
1675 /* see if this is a message to ourselves */
1676 if (pnn == ctdb->pnn) {
1677 return ctdb_local_message(ctdb, srvid, data);
1680 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1681 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1682 struct ctdb_req_message_old);
1683 CTDB_NO_MEMORY(ctdb, r);
1685 r->hdr.destnode = pnn;
1686 r->srvid = srvid;
1687 r->datalen = data.dsize;
1688 memcpy(&r->data[0], data.dptr, data.dsize);
1690 ctdb_queue_packet(ctdb, &r->hdr);
1692 talloc_free(r);
1693 return 0;
1698 struct ctdb_client_notify_list {
1699 struct ctdb_client_notify_list *next, *prev;
1700 struct ctdb_context *ctdb;
1701 uint64_t srvid;
1702 TDB_DATA data;
1706 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1708 int ret;
1710 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1712 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1713 if (ret != 0) {
1714 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1717 return 0;
1720 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1722 struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1723 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1724 struct ctdb_client_notify_list *nl;
1726 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1728 if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1729 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1730 return -1;
1733 if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1734 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1735 return -1;
1739 if (client == NULL) {
1740 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1741 return -1;
1744 for(nl=client->notify; nl; nl=nl->next) {
1745 if (nl->srvid == notify->srvid) {
1746 break;
1749 if (nl != NULL) {
1750 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1751 return -1;
1754 nl = talloc(client, struct ctdb_client_notify_list);
1755 CTDB_NO_MEMORY(ctdb, nl);
1756 nl->ctdb = ctdb;
1757 nl->srvid = notify->srvid;
1758 nl->data.dsize = notify->len;
1759 nl->data.dptr = talloc_memdup(nl, notify->notify_data,
1760 nl->data.dsize);
1761 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1763 DLIST_ADD(client->notify, nl);
1764 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1766 return 0;
1769 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1771 uint64_t srvid = *(uint64_t *)indata.dptr;
1772 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1773 struct ctdb_client_notify_list *nl;
1775 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1777 if (client == NULL) {
1778 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1779 return -1;
1782 for(nl=client->notify; nl; nl=nl->next) {
1783 if (nl->srvid == srvid) {
1784 break;
1787 if (nl == NULL) {
1788 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1789 return -1;
1792 DLIST_REMOVE(client->notify, nl);
1793 talloc_set_destructor(nl, NULL);
1794 talloc_free(nl);
1796 return 0;
1799 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1801 struct ctdb_client_pid_list *client_pid;
1803 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1804 if (client_pid->pid == pid) {
1805 return client_pid->client;
1808 return NULL;
1812 /* This control is used by samba when probing if a process (of a samba daemon)
1813 exists on the node.
1814 Samba does this when it needs/wants to check if a subrecord in one of the
1815 databases is still valid, or if it is stale and can be removed.
1816 If the node is in unhealthy or stopped state we just kill of the samba
1817 process holding this sub-record and return to the calling samba that
1818 the process does not exist.
1819 This allows us to forcefully recall subrecords registered by samba processes
1820 on banned and stopped nodes.
1822 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1824 struct ctdb_client *client;
1826 client = ctdb_find_client_by_pid(ctdb, pid);
1827 if (client == NULL) {
1828 return -1;
1831 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
1832 DEBUG(DEBUG_NOTICE,
1833 ("Killing client with pid:%d on banned/stopped node\n",
1834 (int)pid));
1835 talloc_free(client);
1836 return -1;
1839 return kill(pid, 0);
1842 int32_t ctdb_control_check_pid_srvid(struct ctdb_context *ctdb,
1843 TDB_DATA indata)
1845 struct ctdb_client_pid_list *client_pid;
1846 pid_t pid;
1847 uint64_t srvid;
1848 int ret;
1850 pid = *(pid_t *)indata.dptr;
1851 srvid = *(uint64_t *)(indata.dptr + sizeof(pid_t));
1853 for (client_pid = ctdb->client_pids;
1854 client_pid != NULL;
1855 client_pid = client_pid->next) {
1856 if (client_pid->pid == pid) {
1857 ret = srvid_exists(ctdb->srv, srvid,
1858 client_pid->client);
1859 if (ret == 0) {
1860 return 0;
1865 return -1;
1868 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1870 struct ctdb_node_map_old *node_map = NULL;
1872 CHECK_CONTROL_DATA_SIZE(0);
1874 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1875 if (node_map == NULL) {
1876 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1877 return -1;
1880 outdata->dptr = (unsigned char *)node_map;
1881 outdata->dsize = talloc_get_size(outdata->dptr);
1883 return 0;
1886 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1888 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1889 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1890 return;
1893 DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1894 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1895 ctdb_stop_recoverd(ctdb);
1896 ctdb_stop_keepalive(ctdb);
1897 ctdb_stop_monitoring(ctdb);
1898 ctdb_release_all_ips(ctdb);
1899 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1900 ctdb_stop_eventd(ctdb);
1901 if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1902 ctdb->methods->shutdown(ctdb);
1905 DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1906 exit(exit_code);
1909 /* When forking the main daemon and the child process needs to connect
1910 * back to the daemon as a client process, this function can be used
1911 * to change the ctdb context from daemon into client mode. The child
1912 * process must be created using ctdb_fork() and not fork() -
1913 * ctdb_fork() does some necessary housekeeping.
1915 int switch_from_server_to_client(struct ctdb_context *ctdb)
1917 int ret;
1919 /* get a new event context */
1920 ctdb->ev = tevent_context_init(ctdb);
1921 if (ctdb->ev == NULL) {
1922 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1923 exit(1);
1925 tevent_loop_allow_nesting(ctdb->ev);
1927 /* Connect to main CTDB daemon */
1928 ret = ctdb_socket_connect(ctdb);
1929 if (ret != 0) {
1930 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1931 return -1;
1934 ctdb->can_send_controls = true;
1936 return 0;