ctdb-daemon: Never release all IPs when DisableIPFailover is set
[Samba.git] / ctdb / server / ctdb_daemon.c
blobfa54d3da5bc60a6ccca1fd208729d1b550468041
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "lib/tdb_wrap/tdb_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
33 struct ctdb_client_pid_list {
34 struct ctdb_client_pid_list *next, *prev;
35 struct ctdb_context *ctdb;
36 pid_t pid;
37 struct ctdb_client *client;
40 const char *ctdbd_pidfile = NULL;
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
44 static void print_exit_message(void)
46 if (debug_extra != NULL && debug_extra[0] != '\0') {
47 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48 } else {
49 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
51 /* Wait a second to allow pending log messages to be flushed */
52 sleep(1);
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
59 struct timeval t, void *private_data)
61 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
63 if (getpid() != ctdb->ctdbd_pid) {
64 return;
67 event_add_timed(ctdb->ev, ctdb,
68 timeval_current_ofs(1, 0),
69 ctdb_time_tick, ctdb);
72 /* Used to trigger a dummy event once per second, to make
73 * detection of hangs more reliable.
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
77 event_add_timed(ctdb->ev, ctdb,
78 timeval_current_ofs(1, 0),
79 ctdb_time_tick, ctdb);
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
84 /* start monitoring for connected/disconnected nodes */
85 ctdb_start_keepalive(ctdb);
87 /* start periodic update of tcp tickle lists */
88 ctdb_start_tcp_tickle_update(ctdb);
90 /* start listening for recovery daemon pings */
91 ctdb_control_recd_ping(ctdb);
93 /* start listening to timer ticks */
94 ctdb_start_time_tickd(ctdb);
97 static void ignore_signal(int signum)
99 struct sigaction act;
101 memset(&act, 0, sizeof(act));
103 act.sa_handler = SIG_IGN;
104 sigemptyset(&act.sa_mask);
105 sigaddset(&act.sa_mask, signum);
106 sigaction(signum, &act, NULL);
111 send a packet to a client
113 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
115 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
116 if (hdr->operation == CTDB_REQ_MESSAGE) {
117 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
118 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
119 talloc_free(client);
120 return -1;
123 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
127 message handler for when we are in daemon mode. This redirects the message
128 to the right client
130 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
131 TDB_DATA data, void *private_data)
133 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
134 struct ctdb_req_message *r;
135 int len;
137 /* construct a message to send to the client containing the data */
138 len = offsetof(struct ctdb_req_message, data) + data.dsize;
139 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
140 len, struct ctdb_req_message);
141 CTDB_NO_MEMORY_VOID(ctdb, r);
143 talloc_set_name_const(r, "req_message packet");
145 r->srvid = srvid;
146 r->datalen = data.dsize;
147 memcpy(&r->data[0], data.dptr, data.dsize);
149 daemon_queue_send(client, &r->hdr);
151 talloc_free(r);
155 this is called when the ctdb daemon received a ctdb request to
156 set the srvid from the client
158 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
160 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
161 int res;
162 if (client == NULL) {
163 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
164 return -1;
166 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
167 if (res != 0) {
168 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
169 (unsigned long long)srvid));
170 } else {
171 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
172 (unsigned long long)srvid));
175 return res;
179 this is called when the ctdb daemon received a ctdb request to
180 remove a srvid from the client
182 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
184 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
185 if (client == NULL) {
186 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
187 return -1;
189 return ctdb_deregister_message_handler(ctdb, srvid, client);
192 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
193 TDB_DATA *outdata)
195 uint64_t *ids;
196 int i, num_ids;
197 uint8_t *results;
199 if ((indata.dsize % sizeof(uint64_t)) != 0) {
200 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
201 "size=%d\n", (int)indata.dsize));
202 return -1;
205 ids = (uint64_t *)indata.dptr;
206 num_ids = indata.dsize / 8;
208 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
209 if (results == NULL) {
210 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
211 return -1;
213 for (i=0; i<num_ids; i++) {
214 if (ctdb_check_message_handler(ctdb, ids[i])) {
215 results[i/8] |= (1 << (i%8));
218 outdata->dptr = (uint8_t *)results;
219 outdata->dsize = talloc_get_size(results);
220 return 0;
224 destroy a ctdb_client
226 static int ctdb_client_destructor(struct ctdb_client *client)
228 struct ctdb_db_context *ctdb_db;
230 ctdb_takeover_client_destructor_hook(client);
231 ctdb_reqid_remove(client->ctdb, client->client_id);
232 client->ctdb->num_clients--;
234 if (client->num_persistent_updates != 0) {
235 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
236 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
238 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
239 if (ctdb_db) {
240 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
241 "commit active. Forcing recovery.\n"));
242 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
245 * trans3 transaction state:
247 * The destructor sets the pointer to NULL.
249 talloc_free(ctdb_db->persistent_state);
252 return 0;
257 this is called when the ctdb daemon received a ctdb request message
258 from a local client over the unix domain socket
260 static void daemon_request_message_from_client(struct ctdb_client *client,
261 struct ctdb_req_message *c)
263 TDB_DATA data;
264 int res;
266 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
267 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
270 /* maybe the message is for another client on this node */
271 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
272 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
273 return;
276 /* its for a remote node */
277 data.dptr = &c->data[0];
278 data.dsize = c->datalen;
279 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
280 c->srvid, data);
281 if (res != 0) {
282 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
283 c->hdr.destnode));
288 struct daemon_call_state {
289 struct ctdb_client *client;
290 uint32_t reqid;
291 struct ctdb_call *call;
292 struct timeval start_time;
294 /* readonly request ? */
295 uint32_t readonly_fetch;
296 uint32_t client_callid;
300 complete a call from a client
302 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
304 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
305 struct daemon_call_state);
306 struct ctdb_reply_call *r;
307 int res;
308 uint32_t length;
309 struct ctdb_client *client = dstate->client;
310 struct ctdb_db_context *ctdb_db = state->ctdb_db;
312 talloc_steal(client, dstate);
313 talloc_steal(dstate, dstate->call);
315 res = ctdb_daemon_call_recv(state, dstate->call);
316 if (res != 0) {
317 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
318 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
320 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
321 return;
324 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
325 /* If the client asked for readonly FETCH, we remapped this to
326 FETCH_WITH_HEADER when calling the daemon. So we must
327 strip the extra header off the reply data before passing
328 it back to the client.
330 if (dstate->readonly_fetch
331 && dstate->client_callid == CTDB_FETCH_FUNC) {
332 length -= sizeof(struct ctdb_ltdb_header);
335 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
336 length, struct ctdb_reply_call);
337 if (r == NULL) {
338 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
339 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
340 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
341 return;
343 r->hdr.reqid = dstate->reqid;
344 r->status = dstate->call->status;
346 if (dstate->readonly_fetch
347 && dstate->client_callid == CTDB_FETCH_FUNC) {
348 /* client only asked for a FETCH so we must strip off
349 the extra ctdb_ltdb header
351 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
352 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
353 } else {
354 r->datalen = dstate->call->reply_data.dsize;
355 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
358 res = daemon_queue_send(client, &r->hdr);
359 if (res == -1) {
360 /* client is dead - return immediately */
361 return;
363 if (res != 0) {
364 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
366 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
367 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
368 talloc_free(dstate);
371 struct ctdb_daemon_packet_wrap {
372 struct ctdb_context *ctdb;
373 uint32_t client_id;
377 a wrapper to catch disconnected clients
379 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
381 struct ctdb_client *client;
382 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
383 struct ctdb_daemon_packet_wrap);
384 if (w == NULL) {
385 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
386 return;
389 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
390 if (client == NULL) {
391 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
392 w->client_id));
393 talloc_free(w);
394 return;
396 talloc_free(w);
398 /* process it */
399 daemon_incoming_packet(client, hdr);
402 struct ctdb_deferred_fetch_call {
403 struct ctdb_deferred_fetch_call *next, *prev;
404 struct ctdb_req_call *c;
405 struct ctdb_daemon_packet_wrap *w;
408 struct ctdb_deferred_fetch_queue {
409 struct ctdb_deferred_fetch_call *deferred_calls;
412 struct ctdb_deferred_requeue {
413 struct ctdb_deferred_fetch_call *dfc;
414 struct ctdb_client *client;
417 /* called from a timer event and starts reprocessing the deferred call.*/
418 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
419 struct timeval t, void *private_data)
421 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
422 struct ctdb_client *client = dfr->client;
424 talloc_steal(client, dfr->dfc->c);
425 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
426 talloc_free(dfr);
429 /* the referral context is destroyed either after a timeout or when the initial
430 fetch-lock has finished.
431 at this stage, immediately start reprocessing the queued up deferred
432 calls so they get reprocessed immediately (and since we are dmaster at
433 this stage, trigger the waiting smbd processes to pick up and aquire the
434 record right away.
436 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
439 /* need to reprocess the packets from the queue explicitely instead of
440 just using a normal destructor since we want, need, to
441 call the clients in the same oder as the requests queued up
443 while (dfq->deferred_calls != NULL) {
444 struct ctdb_client *client;
445 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
446 struct ctdb_deferred_requeue *dfr;
448 DLIST_REMOVE(dfq->deferred_calls, dfc);
450 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
451 if (client == NULL) {
452 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
453 dfc->w->client_id));
454 continue;
457 /* process it by pushing it back onto the eventloop */
458 dfr = talloc(client, struct ctdb_deferred_requeue);
459 if (dfr == NULL) {
460 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
461 continue;
464 dfr->dfc = talloc_steal(dfr, dfc);
465 dfr->client = client;
467 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
470 return 0;
473 /* insert the new deferral context into the rb tree.
474 there should never be a pre-existing context here, but check for it
475 warn and destroy the previous context if there is already a deferral context
476 for this key.
478 static void *insert_dfq_callback(void *parm, void *data)
480 if (data) {
481 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
482 talloc_free(data);
484 return parm;
487 /* if the original fetch-lock did not complete within a reasonable time,
488 free the context and context for all deferred requests to cause them to be
489 re-inserted into the event system.
491 static void dfq_timeout(struct event_context *ev, struct timed_event *te,
492 struct timeval t, void *private_data)
494 talloc_free(private_data);
497 /* This function is used in the local daemon to register a KEY in a database
498 for being "fetched"
499 While the remote fetch is in-flight, any futher attempts to re-fetch the
500 same record will be deferred until the fetch completes.
502 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
504 uint32_t *k;
505 struct ctdb_deferred_fetch_queue *dfq;
507 k = ctdb_key_to_idkey(call, call->key);
508 if (k == NULL) {
509 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
510 return -1;
513 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
514 if (dfq == NULL) {
515 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
516 talloc_free(k);
517 return -1;
519 dfq->deferred_calls = NULL;
521 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
523 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
525 /* if the fetch havent completed in 30 seconds, just tear it all down
526 and let it try again as the events are reissued */
527 event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
529 talloc_free(k);
530 return 0;
533 /* check if this is a duplicate request to a fetch already in-flight
534 if it is, make this call deferred to be reprocessed later when
535 the in-flight fetch completes.
537 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
539 uint32_t *k;
540 struct ctdb_deferred_fetch_queue *dfq;
541 struct ctdb_deferred_fetch_call *dfc;
543 k = ctdb_key_to_idkey(c, key);
544 if (k == NULL) {
545 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
546 return -1;
549 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
550 if (dfq == NULL) {
551 talloc_free(k);
552 return -1;
556 talloc_free(k);
558 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
559 if (dfc == NULL) {
560 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
561 return -1;
564 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
565 if (dfc->w == NULL) {
566 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
567 talloc_free(dfc);
568 return -1;
571 dfc->c = talloc_steal(dfc, c);
572 dfc->w->ctdb = ctdb_db->ctdb;
573 dfc->w->client_id = client->client_id;
575 DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
577 return 0;
582 this is called when the ctdb daemon received a ctdb request call
583 from a local client over the unix domain socket
585 static void daemon_request_call_from_client(struct ctdb_client *client,
586 struct ctdb_req_call *c)
588 struct ctdb_call_state *state;
589 struct ctdb_db_context *ctdb_db;
590 struct daemon_call_state *dstate;
591 struct ctdb_call *call;
592 struct ctdb_ltdb_header header;
593 TDB_DATA key, data;
594 int ret;
595 struct ctdb_context *ctdb = client->ctdb;
596 struct ctdb_daemon_packet_wrap *w;
598 CTDB_INCREMENT_STAT(ctdb, total_calls);
599 CTDB_INCREMENT_STAT(ctdb, pending_calls);
601 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
602 if (!ctdb_db) {
603 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
604 c->db_id));
605 CTDB_DECREMENT_STAT(ctdb, pending_calls);
606 return;
609 if (ctdb_db->unhealthy_reason) {
611 * this is just a warning, as the tdb should be empty anyway,
612 * and only persistent databases can be unhealthy, which doesn't
613 * use this code patch
615 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
616 ctdb_db->db_name, ctdb_db->unhealthy_reason));
619 key.dptr = c->data;
620 key.dsize = c->keylen;
622 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
623 CTDB_NO_MEMORY_VOID(ctdb, w);
625 w->ctdb = ctdb;
626 w->client_id = client->client_id;
628 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
629 (struct ctdb_req_header *)c, &data,
630 daemon_incoming_packet_wrap, w, true);
631 if (ret == -2) {
632 /* will retry later */
633 CTDB_DECREMENT_STAT(ctdb, pending_calls);
634 return;
637 talloc_free(w);
639 if (ret != 0) {
640 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
641 CTDB_DECREMENT_STAT(ctdb, pending_calls);
642 return;
646 /* check if this fetch request is a duplicate for a
647 request we already have in flight. If so defer it until
648 the first request completes.
650 if (ctdb->tunable.fetch_collapse == 1) {
651 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
652 ret = ctdb_ltdb_unlock(ctdb_db, key);
653 if (ret != 0) {
654 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
656 CTDB_DECREMENT_STAT(ctdb, pending_calls);
657 return;
661 /* Dont do READONLY if we dont have a tracking database */
662 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
663 c->flags &= ~CTDB_WANT_READONLY;
666 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
667 header.flags &= ~CTDB_REC_RO_FLAGS;
668 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
669 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
670 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
671 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
673 /* and clear out the tracking data */
674 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
675 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
679 /* if we are revoking, we must defer all other calls until the revoke
680 * had completed.
682 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
683 talloc_free(data.dptr);
684 ret = ctdb_ltdb_unlock(ctdb_db, key);
686 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
687 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
689 CTDB_DECREMENT_STAT(ctdb, pending_calls);
690 return;
693 if ((header.dmaster == ctdb->pnn)
694 && (!(c->flags & CTDB_WANT_READONLY))
695 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
696 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
697 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
698 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
700 ret = ctdb_ltdb_unlock(ctdb_db, key);
702 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
703 ctdb_fatal(ctdb, "Failed to start record revoke");
705 talloc_free(data.dptr);
707 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
708 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
711 CTDB_DECREMENT_STAT(ctdb, pending_calls);
712 return;
715 dstate = talloc(client, struct daemon_call_state);
716 if (dstate == NULL) {
717 ret = ctdb_ltdb_unlock(ctdb_db, key);
718 if (ret != 0) {
719 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
722 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
723 CTDB_DECREMENT_STAT(ctdb, pending_calls);
724 return;
726 dstate->start_time = timeval_current();
727 dstate->client = client;
728 dstate->reqid = c->hdr.reqid;
729 talloc_steal(dstate, data.dptr);
731 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
732 if (call == NULL) {
733 ret = ctdb_ltdb_unlock(ctdb_db, key);
734 if (ret != 0) {
735 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
738 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
739 CTDB_DECREMENT_STAT(ctdb, pending_calls);
740 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
741 return;
744 dstate->readonly_fetch = 0;
745 call->call_id = c->callid;
746 call->key = key;
747 call->call_data.dptr = c->data + c->keylen;
748 call->call_data.dsize = c->calldatalen;
749 call->flags = c->flags;
751 if (c->flags & CTDB_WANT_READONLY) {
752 /* client wants readonly record, so translate this into a
753 fetch with header. remember what the client asked for
754 so we can remap the reply back to the proper format for
755 the client in the reply
757 dstate->client_callid = call->call_id;
758 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
759 dstate->readonly_fetch = 1;
762 if (header.dmaster == ctdb->pnn) {
763 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
764 } else {
765 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
766 if (ctdb->tunable.fetch_collapse == 1) {
767 /* This request triggered a remote fetch-lock.
768 set up a deferral for this key so any additional
769 fetch-locks are deferred until the current one
770 finishes.
772 setup_deferred_fetch_locks(ctdb_db, call);
776 ret = ctdb_ltdb_unlock(ctdb_db, key);
777 if (ret != 0) {
778 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
781 if (state == NULL) {
782 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
783 CTDB_DECREMENT_STAT(ctdb, pending_calls);
784 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
785 return;
787 talloc_steal(state, dstate);
788 talloc_steal(client, state);
790 state->async.fn = daemon_call_from_client_callback;
791 state->async.private_data = dstate;
795 static void daemon_request_control_from_client(struct ctdb_client *client,
796 struct ctdb_req_control *c);
798 /* data contains a packet from the client */
799 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
801 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
802 TALLOC_CTX *tmp_ctx;
803 struct ctdb_context *ctdb = client->ctdb;
805 /* place the packet as a child of a tmp_ctx. We then use
806 talloc_free() below to free it. If any of the calls want
807 to keep it, then they will steal it somewhere else, and the
808 talloc_free() will be a no-op */
809 tmp_ctx = talloc_new(client);
810 talloc_steal(tmp_ctx, hdr);
812 if (hdr->ctdb_magic != CTDB_MAGIC) {
813 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
814 goto done;
817 if (hdr->ctdb_version != CTDB_PROTOCOL) {
818 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
819 goto done;
822 switch (hdr->operation) {
823 case CTDB_REQ_CALL:
824 CTDB_INCREMENT_STAT(ctdb, client.req_call);
825 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
826 break;
828 case CTDB_REQ_MESSAGE:
829 CTDB_INCREMENT_STAT(ctdb, client.req_message);
830 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
831 break;
833 case CTDB_REQ_CONTROL:
834 CTDB_INCREMENT_STAT(ctdb, client.req_control);
835 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
836 break;
838 default:
839 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
840 hdr->operation));
843 done:
844 talloc_free(tmp_ctx);
848 called when the daemon gets a incoming packet
850 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
852 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
853 struct ctdb_req_header *hdr;
855 if (cnt == 0) {
856 talloc_free(client);
857 return;
860 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
862 if (cnt < sizeof(*hdr)) {
863 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
864 (unsigned)cnt);
865 return;
867 hdr = (struct ctdb_req_header *)data;
868 if (cnt != hdr->length) {
869 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
870 (unsigned)hdr->length, (unsigned)cnt);
871 return;
874 if (hdr->ctdb_magic != CTDB_MAGIC) {
875 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
876 return;
879 if (hdr->ctdb_version != CTDB_PROTOCOL) {
880 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
881 return;
884 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
885 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
886 hdr->srcnode, hdr->destnode));
888 /* it is the responsibility of the incoming packet function to free 'data' */
889 daemon_incoming_packet(client, hdr);
893 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
895 if (client_pid->ctdb->client_pids != NULL) {
896 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
899 return 0;
903 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
904 uint16_t flags, void *private_data)
906 struct sockaddr_un addr;
907 socklen_t len;
908 int fd;
909 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
910 struct ctdb_client *client;
911 struct ctdb_client_pid_list *client_pid;
912 pid_t peer_pid = 0;
914 memset(&addr, 0, sizeof(addr));
915 len = sizeof(addr);
916 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
917 if (fd == -1) {
918 return;
921 set_nonblocking(fd);
922 set_close_on_exec(fd);
924 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
926 client = talloc_zero(ctdb, struct ctdb_client);
927 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
928 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
931 client->ctdb = ctdb;
932 client->fd = fd;
933 client->client_id = ctdb_reqid_new(ctdb, client);
934 client->pid = peer_pid;
936 client_pid = talloc(client, struct ctdb_client_pid_list);
937 if (client_pid == NULL) {
938 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
939 close(fd);
940 talloc_free(client);
941 return;
943 client_pid->ctdb = ctdb;
944 client_pid->pid = peer_pid;
945 client_pid->client = client;
947 DLIST_ADD(ctdb->client_pids, client_pid);
949 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
950 ctdb_daemon_read_cb, client,
951 "client-%u", client->pid);
953 talloc_set_destructor(client, ctdb_client_destructor);
954 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
955 ctdb->num_clients++;
961 create a unix domain socket and bind it
962 return a file descriptor open on the socket
964 static int ux_socket_bind(struct ctdb_context *ctdb)
966 struct sockaddr_un addr;
968 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
969 if (ctdb->daemon.sd == -1) {
970 return -1;
973 memset(&addr, 0, sizeof(addr));
974 addr.sun_family = AF_UNIX;
975 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
977 /* First check if an old ctdbd might be running */
978 if (connect(ctdb->daemon.sd,
979 (struct sockaddr *)&addr, sizeof(addr)) == 0) {
980 DEBUG(DEBUG_CRIT,
981 ("Something is already listening on ctdb socket '%s'\n",
982 ctdb->daemon.name));
983 goto failed;
986 /* Remove any old socket */
987 unlink(ctdb->daemon.name);
989 set_close_on_exec(ctdb->daemon.sd);
990 set_nonblocking(ctdb->daemon.sd);
992 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
993 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
994 goto failed;
997 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
998 chmod(ctdb->daemon.name, 0700) != 0) {
999 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1000 goto failed;
1004 if (listen(ctdb->daemon.sd, 100) != 0) {
1005 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1006 goto failed;
1009 return 0;
1011 failed:
1012 close(ctdb->daemon.sd);
1013 ctdb->daemon.sd = -1;
1014 return -1;
1017 static void initialise_node_flags (struct ctdb_context *ctdb)
1019 if (ctdb->pnn == -1) {
1020 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1023 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1025 /* do we start out in DISABLED mode? */
1026 if (ctdb->start_as_disabled != 0) {
1027 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1028 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1030 /* do we start out in STOPPED mode? */
1031 if (ctdb->start_as_stopped != 0) {
1032 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1033 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1037 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1038 void *private_data)
1040 if (status != 0) {
1041 ctdb_die(ctdb, "Failed to run setup event");
1043 ctdb_run_notification_script(ctdb, "setup");
1045 /* tell all other nodes we've just started up */
1046 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1047 0, CTDB_CONTROL_STARTUP, 0,
1048 CTDB_CTRL_FLAG_NOREPLY,
1049 tdb_null, NULL, NULL);
1051 /* Start the recovery daemon */
1052 if (ctdb_start_recoverd(ctdb) != 0) {
1053 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1054 exit(11);
1057 ctdb_start_periodic_events(ctdb);
1059 ctdb_wait_for_first_recovery(ctdb);
1062 static struct timeval tevent_before_wait_ts;
1063 static struct timeval tevent_after_wait_ts;
1065 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1066 void *private_data)
1068 struct timeval diff;
1069 struct timeval now;
1070 struct ctdb_context *ctdb =
1071 talloc_get_type(private_data, struct ctdb_context);
1073 if (getpid() != ctdb->ctdbd_pid) {
1074 return;
1077 now = timeval_current();
1079 switch (tp) {
1080 case TEVENT_TRACE_BEFORE_WAIT:
1081 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1082 diff = timeval_until(&tevent_after_wait_ts, &now);
1083 if (diff.tv_sec > 3) {
1084 DEBUG(DEBUG_ERR,
1085 ("Handling event took %ld seconds!\n",
1086 diff.tv_sec));
1089 tevent_before_wait_ts = now;
1090 break;
1092 case TEVENT_TRACE_AFTER_WAIT:
1093 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1094 diff = timeval_until(&tevent_before_wait_ts, &now);
1095 if (diff.tv_sec > 3) {
1096 DEBUG(DEBUG_CRIT,
1097 ("No event for %ld seconds!\n",
1098 diff.tv_sec));
1101 tevent_after_wait_ts = now;
1102 break;
1104 default:
1105 /* Do nothing for future tevent trace points */ ;
1109 static void ctdb_remove_pidfile(void)
1111 /* Only the main ctdbd's PID matches the SID */
1112 if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1113 if (unlink(ctdbd_pidfile) == 0) {
1114 DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1115 ctdbd_pidfile));
1116 } else {
1117 DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1118 ctdbd_pidfile));
1123 static void ctdb_create_pidfile(pid_t pid)
1125 if (ctdbd_pidfile != NULL) {
1126 FILE *fp;
1128 fp = fopen(ctdbd_pidfile, "w");
1129 if (fp == NULL) {
1130 DEBUG(DEBUG_ALERT,
1131 ("Failed to open PID file %s\n", ctdbd_pidfile));
1132 exit(11);
1135 fprintf(fp, "%d\n", pid);
1136 fclose(fp);
1137 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1138 atexit(ctdb_remove_pidfile);
1142 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1144 int i, j, count;
1146 /* initialize the vnn mapping table, skipping any deleted nodes */
1147 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1148 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1150 count = 0;
1151 for (i = 0; i < ctdb->num_nodes; i++) {
1152 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1153 count++;
1157 ctdb->vnn_map->generation = INVALID_GENERATION;
1158 ctdb->vnn_map->size = count;
1159 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1160 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1162 for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1163 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1164 continue;
1166 ctdb->vnn_map->map[j] = i;
1167 j++;
1171 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1173 int nodeid;
1175 if (ctdb->address == NULL) {
1176 ctdb_fatal(ctdb,
1177 "Can not determine PNN - node address is not set\n");
1180 nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1181 if (nodeid == -1) {
1182 ctdb_fatal(ctdb,
1183 "Can not determine PNN - node address not found in node list\n");
1186 ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1187 DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1191 start the protocol going as a daemon
1193 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1195 int res, ret = -1;
1196 struct fd_event *fde;
1197 const char *domain_socket_name;
1199 /* create a unix domain stream socket to listen to */
1200 res = ux_socket_bind(ctdb);
1201 if (res!=0) {
1202 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1203 exit(10);
1206 if (do_fork && fork()) {
1207 return 0;
1210 tdb_reopen_all(false);
1212 if (do_fork) {
1213 if (setsid() == -1) {
1214 ctdb_die(ctdb, "Failed to setsid()\n");
1216 close(0);
1217 if (open("/dev/null", O_RDONLY) != 0) {
1218 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1219 exit(11);
1222 ignore_signal(SIGPIPE);
1224 ctdb->ctdbd_pid = getpid();
1225 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1226 CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1227 ctdb_create_pidfile(ctdb->ctdbd_pid);
1229 /* Make sure we log something when the daemon terminates.
1230 * This must be the first exit handler to run (so the last to
1231 * be registered.
1233 atexit(print_exit_message);
1235 if (ctdb->do_setsched) {
1236 /* try to set us up as realtime */
1237 if (!set_scheduler()) {
1238 exit(1);
1240 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1243 /* ensure the socket is deleted on exit of the daemon */
1244 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1245 if (domain_socket_name == NULL) {
1246 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1247 exit(12);
1250 ctdb->ev = event_context_init(NULL);
1251 tevent_loop_allow_nesting(ctdb->ev);
1252 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1253 ret = ctdb_init_tevent_logging(ctdb);
1254 if (ret != 0) {
1255 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1256 exit(1);
1259 /* set up a handler to pick up sigchld */
1260 if (ctdb_init_sigchld(ctdb) == NULL) {
1261 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1262 exit(1);
1265 ctdb_set_child_logging(ctdb);
1267 /* initialize statistics collection */
1268 ctdb_statistics_init(ctdb);
1270 /* force initial recovery for election */
1271 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1273 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1274 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1275 if (ret != 0) {
1276 ctdb_die(ctdb, "Failed to run init event\n");
1278 ctdb_run_notification_script(ctdb, "init");
1280 if (strcmp(ctdb->transport, "tcp") == 0) {
1281 ret = ctdb_tcp_init(ctdb);
1283 #ifdef USE_INFINIBAND
1284 if (strcmp(ctdb->transport, "ib") == 0) {
1285 ret = ctdb_ibw_init(ctdb);
1287 #endif
1288 if (ret != 0) {
1289 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1290 return -1;
1293 if (ctdb->methods == NULL) {
1294 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1295 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1298 /* Initialise the transport. This sets the node address if it
1299 * was not set via the command-line. */
1300 if (ctdb->methods->initialise(ctdb) != 0) {
1301 ctdb_fatal(ctdb, "transport failed to initialise");
1304 ctdb_set_my_pnn(ctdb);
1306 initialise_node_flags(ctdb);
1308 if (ctdb->public_addresses_file) {
1309 ret = ctdb_set_public_addresses(ctdb, true);
1310 if (ret == -1) {
1311 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1312 exit(1);
1316 ctdb_initialise_vnn_map(ctdb);
1318 /* attach to existing databases */
1319 if (ctdb_attach_databases(ctdb) != 0) {
1320 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1323 /* start frozen, then let the first election sort things out */
1324 if (!ctdb_blocking_freeze(ctdb)) {
1325 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1328 /* now start accepting clients, only can do this once frozen */
1329 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
1330 EVENT_FD_READ,
1331 ctdb_accept_client, ctdb);
1332 if (fde == NULL) {
1333 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1335 tevent_fd_set_auto_close(fde);
1337 /* release any IPs we hold from previous runs of the daemon */
1338 ctdb_release_all_ips(ctdb);
1340 /* Start the transport */
1341 if (ctdb->methods->start(ctdb) != 0) {
1342 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1343 ctdb_fatal(ctdb, "transport failed to start");
1346 /* Recovery daemon and timed events are started from the
1347 * callback, only after the setup event completes
1348 * successfully.
1350 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1351 ret = ctdb_event_script_callback(ctdb,
1352 ctdb,
1353 ctdb_setup_event_callback,
1354 ctdb,
1355 CTDB_EVENT_SETUP,
1356 "%s",
1357 "");
1358 if (ret != 0) {
1359 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1360 exit(1);
1363 lockdown_memory(ctdb->valgrinding);
1365 /* go into a wait loop to allow other nodes to complete */
1366 event_loop_wait(ctdb->ev);
1368 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1369 exit(1);
1373 allocate a packet for use in daemon<->daemon communication
1375 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1376 TALLOC_CTX *mem_ctx,
1377 enum ctdb_operation operation,
1378 size_t length, size_t slength,
1379 const char *type)
1381 int size;
1382 struct ctdb_req_header *hdr;
1384 length = MAX(length, slength);
1385 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1387 if (ctdb->methods == NULL) {
1388 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1389 operation, (unsigned)length));
1390 return NULL;
1393 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1394 if (hdr == NULL) {
1395 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1396 operation, (unsigned)length));
1397 return NULL;
1399 talloc_set_name_const(hdr, type);
1400 memset(hdr, 0, slength);
1401 hdr->length = length;
1402 hdr->operation = operation;
1403 hdr->ctdb_magic = CTDB_MAGIC;
1404 hdr->ctdb_version = CTDB_PROTOCOL;
1405 hdr->generation = ctdb->vnn_map->generation;
1406 hdr->srcnode = ctdb->pnn;
1408 return hdr;
1411 struct daemon_control_state {
1412 struct daemon_control_state *next, *prev;
1413 struct ctdb_client *client;
1414 struct ctdb_req_control *c;
1415 uint32_t reqid;
1416 struct ctdb_node *node;
1420 callback when a control reply comes in
1422 static void daemon_control_callback(struct ctdb_context *ctdb,
1423 int32_t status, TDB_DATA data,
1424 const char *errormsg,
1425 void *private_data)
1427 struct daemon_control_state *state = talloc_get_type(private_data,
1428 struct daemon_control_state);
1429 struct ctdb_client *client = state->client;
1430 struct ctdb_reply_control *r;
1431 size_t len;
1432 int ret;
1434 /* construct a message to send to the client containing the data */
1435 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1436 if (errormsg) {
1437 len += strlen(errormsg);
1439 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1440 struct ctdb_reply_control);
1441 CTDB_NO_MEMORY_VOID(ctdb, r);
1443 r->hdr.reqid = state->reqid;
1444 r->status = status;
1445 r->datalen = data.dsize;
1446 r->errorlen = 0;
1447 memcpy(&r->data[0], data.dptr, data.dsize);
1448 if (errormsg) {
1449 r->errorlen = strlen(errormsg);
1450 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1453 ret = daemon_queue_send(client, &r->hdr);
1454 if (ret != -1) {
1455 talloc_free(state);
1460 fail all pending controls to a disconnected node
1462 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1464 struct daemon_control_state *state;
1465 while ((state = node->pending_controls)) {
1466 DLIST_REMOVE(node->pending_controls, state);
1467 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1468 "node is disconnected", state);
1473 destroy a daemon_control_state
1475 static int daemon_control_destructor(struct daemon_control_state *state)
1477 if (state->node) {
1478 DLIST_REMOVE(state->node->pending_controls, state);
1480 return 0;
1484 this is called when the ctdb daemon received a ctdb request control
1485 from a local client over the unix domain socket
1487 static void daemon_request_control_from_client(struct ctdb_client *client,
1488 struct ctdb_req_control *c)
1490 TDB_DATA data;
1491 int res;
1492 struct daemon_control_state *state;
1493 TALLOC_CTX *tmp_ctx = talloc_new(client);
1495 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1496 c->hdr.destnode = client->ctdb->pnn;
1499 state = talloc(client, struct daemon_control_state);
1500 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1502 state->client = client;
1503 state->c = talloc_steal(state, c);
1504 state->reqid = c->hdr.reqid;
1505 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1506 state->node = client->ctdb->nodes[c->hdr.destnode];
1507 DLIST_ADD(state->node->pending_controls, state);
1508 } else {
1509 state->node = NULL;
1512 talloc_set_destructor(state, daemon_control_destructor);
1514 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1515 talloc_steal(tmp_ctx, state);
1518 data.dptr = &c->data[0];
1519 data.dsize = c->datalen;
1520 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1521 c->srvid, c->opcode, client->client_id,
1522 c->flags,
1523 data, daemon_control_callback,
1524 state);
1525 if (res != 0) {
1526 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1527 c->hdr.destnode));
1530 talloc_free(tmp_ctx);
1534 register a call function
1536 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1537 ctdb_fn_t fn, int id)
1539 struct ctdb_registered_call *call;
1540 struct ctdb_db_context *ctdb_db;
1542 ctdb_db = find_ctdb_db(ctdb, db_id);
1543 if (ctdb_db == NULL) {
1544 return -1;
1547 call = talloc(ctdb_db, struct ctdb_registered_call);
1548 call->fn = fn;
1549 call->id = id;
1551 DLIST_ADD(ctdb_db->calls, call);
1552 return 0;
1558 this local messaging handler is ugly, but is needed to prevent
1559 recursion in ctdb_send_message() when the destination node is the
1560 same as the source node
1562 struct ctdb_local_message {
1563 struct ctdb_context *ctdb;
1564 uint64_t srvid;
1565 TDB_DATA data;
1568 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1569 struct timeval t, void *private_data)
1571 struct ctdb_local_message *m = talloc_get_type(private_data,
1572 struct ctdb_local_message);
1573 int res;
1575 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1576 if (res != 0) {
1577 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1578 (unsigned long long)m->srvid));
1580 talloc_free(m);
1583 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1585 struct ctdb_local_message *m;
1586 m = talloc(ctdb, struct ctdb_local_message);
1587 CTDB_NO_MEMORY(ctdb, m);
1589 m->ctdb = ctdb;
1590 m->srvid = srvid;
1591 m->data = data;
1592 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1593 if (m->data.dptr == NULL) {
1594 talloc_free(m);
1595 return -1;
1598 /* this needs to be done as an event to prevent recursion */
1599 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1600 return 0;
1604 send a ctdb message
1606 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1607 uint64_t srvid, TDB_DATA data)
1609 struct ctdb_req_message *r;
1610 int len;
1612 if (ctdb->methods == NULL) {
1613 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1614 return -1;
1617 /* see if this is a message to ourselves */
1618 if (pnn == ctdb->pnn) {
1619 return ctdb_local_message(ctdb, srvid, data);
1622 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1623 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1624 struct ctdb_req_message);
1625 CTDB_NO_MEMORY(ctdb, r);
1627 r->hdr.destnode = pnn;
1628 r->srvid = srvid;
1629 r->datalen = data.dsize;
1630 memcpy(&r->data[0], data.dptr, data.dsize);
1632 ctdb_queue_packet(ctdb, &r->hdr);
1634 talloc_free(r);
1635 return 0;
1640 struct ctdb_client_notify_list {
1641 struct ctdb_client_notify_list *next, *prev;
1642 struct ctdb_context *ctdb;
1643 uint64_t srvid;
1644 TDB_DATA data;
1648 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1650 int ret;
1652 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1654 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1655 if (ret != 0) {
1656 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1659 return 0;
1662 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1664 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1665 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1666 struct ctdb_client_notify_list *nl;
1668 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1670 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1671 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1672 return -1;
1675 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1676 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1677 return -1;
1681 if (client == NULL) {
1682 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1683 return -1;
1686 for(nl=client->notify; nl; nl=nl->next) {
1687 if (nl->srvid == notify->srvid) {
1688 break;
1691 if (nl != NULL) {
1692 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1693 return -1;
1696 nl = talloc(client, struct ctdb_client_notify_list);
1697 CTDB_NO_MEMORY(ctdb, nl);
1698 nl->ctdb = ctdb;
1699 nl->srvid = notify->srvid;
1700 nl->data.dsize = notify->len;
1701 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1702 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1703 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1705 DLIST_ADD(client->notify, nl);
1706 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1708 return 0;
1711 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1713 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1714 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1715 struct ctdb_client_notify_list *nl;
1717 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1719 if (client == NULL) {
1720 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1721 return -1;
1724 for(nl=client->notify; nl; nl=nl->next) {
1725 if (nl->srvid == notify->srvid) {
1726 break;
1729 if (nl == NULL) {
1730 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1731 return -1;
1734 DLIST_REMOVE(client->notify, nl);
1735 talloc_set_destructor(nl, NULL);
1736 talloc_free(nl);
1738 return 0;
1741 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1743 struct ctdb_client_pid_list *client_pid;
1745 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1746 if (client_pid->pid == pid) {
1747 return client_pid->client;
1750 return NULL;
1754 /* This control is used by samba when probing if a process (of a samba daemon)
1755 exists on the node.
1756 Samba does this when it needs/wants to check if a subrecord in one of the
1757 databases is still valied, or if it is stale and can be removed.
1758 If the node is in unhealthy or stopped state we just kill of the samba
1759 process holding htis sub-record and return to the calling samba that
1760 the process does not exist.
1761 This allows us to forcefully recall subrecords registered by samba processes
1762 on banned and stopped nodes.
1764 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1766 struct ctdb_client *client;
1768 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1769 client = ctdb_find_client_by_pid(ctdb, pid);
1770 if (client != NULL) {
1771 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1772 talloc_free(client);
1774 return -1;
1777 return kill(pid, 0);
1780 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1782 struct ctdb_node_map *node_map = NULL;
1784 CHECK_CONTROL_DATA_SIZE(0);
1786 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1787 if (node_map == NULL) {
1788 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1789 return -1;
1792 outdata->dptr = (unsigned char *)node_map;
1793 outdata->dsize = talloc_get_size(outdata->dptr);
1795 return 0;
1798 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1800 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1801 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1802 return;
1805 DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1806 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1807 ctdb_stop_recoverd(ctdb);
1808 ctdb_stop_keepalive(ctdb);
1809 ctdb_stop_monitoring(ctdb);
1810 ctdb_release_all_ips(ctdb);
1811 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1812 if (ctdb->methods != NULL) {
1813 ctdb->methods->shutdown(ctdb);
1816 DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1817 exit(exit_code);