s3: libsmb: Implement cli_smb2_setatr() by calling cli_smb2_setpathinfo().
[Samba.git] / ctdb / server / ctdb_daemon.c
blobb5cee615e910934e9c13212222330796f3c2d142
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/blocking.h"
38 #include "ctdb_version.h"
39 #include "ctdb_private.h"
40 #include "ctdb_client.h"
42 #include "common/rb_tree.h"
43 #include "common/reqid.h"
44 #include "common/system.h"
45 #include "common/common.h"
46 #include "common/logging.h"
47 #include "common/pidfile.h"
48 #include "common/sock_io.h"
50 struct ctdb_client_pid_list {
51 struct ctdb_client_pid_list *next, *prev;
52 struct ctdb_context *ctdb;
53 pid_t pid;
54 struct ctdb_client *client;
57 const char *ctdbd_pidfile = NULL;
58 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
60 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
62 static pid_t __ctdbd_pid;
64 static void print_exit_message(void)
66 if (getpid() == __ctdbd_pid) {
67 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
69 /* Wait a second to allow pending log messages to be flushed */
70 sleep(1);
76 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
77 struct timeval t, void *private_data)
79 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
81 if (getpid() != ctdb->ctdbd_pid) {
82 return;
85 tevent_add_timer(ctdb->ev, ctdb,
86 timeval_current_ofs(1, 0),
87 ctdb_time_tick, ctdb);
90 /* Used to trigger a dummy event once per second, to make
91 * detection of hangs more reliable.
93 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
95 tevent_add_timer(ctdb->ev, ctdb,
96 timeval_current_ofs(1, 0),
97 ctdb_time_tick, ctdb);
100 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
102 /* start monitoring for connected/disconnected nodes */
103 ctdb_start_keepalive(ctdb);
105 /* start periodic update of tcp tickle lists */
106 ctdb_start_tcp_tickle_update(ctdb);
108 /* start listening for recovery daemon pings */
109 ctdb_control_recd_ping(ctdb);
111 /* start listening to timer ticks */
112 ctdb_start_time_tickd(ctdb);
115 static void ignore_signal(int signum)
117 struct sigaction act;
119 memset(&act, 0, sizeof(act));
121 act.sa_handler = SIG_IGN;
122 sigemptyset(&act.sa_mask);
123 sigaddset(&act.sa_mask, signum);
124 sigaction(signum, &act, NULL);
129 send a packet to a client
131 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
133 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
134 if (hdr->operation == CTDB_REQ_MESSAGE) {
135 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
136 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
137 talloc_free(client);
138 return -1;
141 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
145 message handler for when we are in daemon mode. This redirects the message
146 to the right client
148 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
149 void *private_data)
151 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
152 struct ctdb_req_message_old *r;
153 int len;
155 /* construct a message to send to the client containing the data */
156 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
157 r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
158 len, struct ctdb_req_message_old);
159 CTDB_NO_MEMORY_VOID(client->ctdb, r);
161 talloc_set_name_const(r, "req_message packet");
163 r->srvid = srvid;
164 r->datalen = data.dsize;
165 memcpy(&r->data[0], data.dptr, data.dsize);
167 daemon_queue_send(client, &r->hdr);
169 talloc_free(r);
173 this is called when the ctdb daemon received a ctdb request to
174 set the srvid from the client
176 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
178 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
179 int res;
180 if (client == NULL) {
181 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
182 return -1;
184 res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
185 client);
186 if (res != 0) {
187 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
188 (unsigned long long)srvid));
189 } else {
190 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
191 (unsigned long long)srvid));
194 return res;
198 this is called when the ctdb daemon received a ctdb request to
199 remove a srvid from the client
201 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
203 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
204 if (client == NULL) {
205 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
206 return -1;
208 return srvid_deregister(ctdb->srv, srvid, client);
211 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
212 TDB_DATA *outdata)
214 uint64_t *ids;
215 int i, num_ids;
216 uint8_t *results;
218 if ((indata.dsize % sizeof(uint64_t)) != 0) {
219 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
220 "size=%d\n", (int)indata.dsize));
221 return -1;
224 ids = (uint64_t *)indata.dptr;
225 num_ids = indata.dsize / 8;
227 results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
228 if (results == NULL) {
229 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
230 return -1;
232 for (i=0; i<num_ids; i++) {
233 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
234 results[i/8] |= (1 << (i%8));
237 outdata->dptr = (uint8_t *)results;
238 outdata->dsize = talloc_get_size(results);
239 return 0;
243 destroy a ctdb_client
245 static int ctdb_client_destructor(struct ctdb_client *client)
247 struct ctdb_db_context *ctdb_db;
249 ctdb_takeover_client_destructor_hook(client);
250 reqid_remove(client->ctdb->idr, client->client_id);
251 client->ctdb->num_clients--;
253 if (client->num_persistent_updates != 0) {
254 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
255 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
257 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
258 if (ctdb_db) {
259 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
260 "commit active. Forcing recovery.\n"));
261 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
264 * trans3 transaction state:
266 * The destructor sets the pointer to NULL.
268 talloc_free(ctdb_db->persistent_state);
271 return 0;
276 this is called when the ctdb daemon received a ctdb request message
277 from a local client over the unix domain socket
279 static void daemon_request_message_from_client(struct ctdb_client *client,
280 struct ctdb_req_message_old *c)
282 TDB_DATA data;
283 int res;
285 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
286 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
289 /* maybe the message is for another client on this node */
290 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
291 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
292 return;
295 /* its for a remote node */
296 data.dptr = &c->data[0];
297 data.dsize = c->datalen;
298 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
299 c->srvid, data);
300 if (res != 0) {
301 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
302 c->hdr.destnode));
307 struct daemon_call_state {
308 struct ctdb_client *client;
309 uint32_t reqid;
310 struct ctdb_call *call;
311 struct timeval start_time;
313 /* readonly request ? */
314 uint32_t readonly_fetch;
315 uint32_t client_callid;
319 complete a call from a client
321 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
323 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
324 struct daemon_call_state);
325 struct ctdb_reply_call_old *r;
326 int res;
327 uint32_t length;
328 struct ctdb_client *client = dstate->client;
329 struct ctdb_db_context *ctdb_db = state->ctdb_db;
331 talloc_steal(client, dstate);
332 talloc_steal(dstate, dstate->call);
334 res = ctdb_daemon_call_recv(state, dstate->call);
335 if (res != 0) {
336 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
337 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
339 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
340 return;
343 length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
344 /* If the client asked for readonly FETCH, we remapped this to
345 FETCH_WITH_HEADER when calling the daemon. So we must
346 strip the extra header off the reply data before passing
347 it back to the client.
349 if (dstate->readonly_fetch
350 && dstate->client_callid == CTDB_FETCH_FUNC) {
351 length -= sizeof(struct ctdb_ltdb_header);
354 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
355 length, struct ctdb_reply_call_old);
356 if (r == NULL) {
357 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
358 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
359 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
360 return;
362 r->hdr.reqid = dstate->reqid;
363 r->status = dstate->call->status;
365 if (dstate->readonly_fetch
366 && dstate->client_callid == CTDB_FETCH_FUNC) {
367 /* client only asked for a FETCH so we must strip off
368 the extra ctdb_ltdb header
370 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
371 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
372 } else {
373 r->datalen = dstate->call->reply_data.dsize;
374 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
377 res = daemon_queue_send(client, &r->hdr);
378 if (res == -1) {
379 /* client is dead - return immediately */
380 return;
382 if (res != 0) {
383 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
385 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
386 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
387 talloc_free(dstate);
390 struct ctdb_daemon_packet_wrap {
391 struct ctdb_context *ctdb;
392 uint32_t client_id;
396 a wrapper to catch disconnected clients
398 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
400 struct ctdb_client *client;
401 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
402 struct ctdb_daemon_packet_wrap);
403 if (w == NULL) {
404 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
405 return;
408 client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
409 if (client == NULL) {
410 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
411 w->client_id));
412 talloc_free(w);
413 return;
415 talloc_free(w);
417 /* process it */
418 daemon_incoming_packet(client, hdr);
421 struct ctdb_deferred_fetch_call {
422 struct ctdb_deferred_fetch_call *next, *prev;
423 struct ctdb_req_call_old *c;
424 struct ctdb_daemon_packet_wrap *w;
427 struct ctdb_deferred_fetch_queue {
428 struct ctdb_deferred_fetch_call *deferred_calls;
431 struct ctdb_deferred_requeue {
432 struct ctdb_deferred_fetch_call *dfc;
433 struct ctdb_client *client;
436 /* called from a timer event and starts reprocessing the deferred call.*/
437 static void reprocess_deferred_call(struct tevent_context *ev,
438 struct tevent_timer *te,
439 struct timeval t, void *private_data)
441 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
442 struct ctdb_client *client = dfr->client;
444 talloc_steal(client, dfr->dfc->c);
445 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
446 talloc_free(dfr);
449 /* the referral context is destroyed either after a timeout or when the initial
450 fetch-lock has finished.
451 at this stage, immediately start reprocessing the queued up deferred
452 calls so they get reprocessed immediately (and since we are dmaster at
453 this stage, trigger the waiting smbd processes to pick up and aquire the
454 record right away.
456 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
459 /* need to reprocess the packets from the queue explicitely instead of
460 just using a normal destructor since we want, need, to
461 call the clients in the same oder as the requests queued up
463 while (dfq->deferred_calls != NULL) {
464 struct ctdb_client *client;
465 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
466 struct ctdb_deferred_requeue *dfr;
468 DLIST_REMOVE(dfq->deferred_calls, dfc);
470 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
471 if (client == NULL) {
472 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
473 dfc->w->client_id));
474 continue;
477 /* process it by pushing it back onto the eventloop */
478 dfr = talloc(client, struct ctdb_deferred_requeue);
479 if (dfr == NULL) {
480 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
481 continue;
484 dfr->dfc = talloc_steal(dfr, dfc);
485 dfr->client = client;
487 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
488 reprocess_deferred_call, dfr);
491 return 0;
494 /* insert the new deferral context into the rb tree.
495 there should never be a pre-existing context here, but check for it
496 warn and destroy the previous context if there is already a deferral context
497 for this key.
499 static void *insert_dfq_callback(void *parm, void *data)
501 if (data) {
502 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
503 talloc_free(data);
505 return parm;
508 /* if the original fetch-lock did not complete within a reasonable time,
509 free the context and context for all deferred requests to cause them to be
510 re-inserted into the event system.
512 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
513 struct timeval t, void *private_data)
515 talloc_free(private_data);
518 /* This function is used in the local daemon to register a KEY in a database
519 for being "fetched"
520 While the remote fetch is in-flight, any futher attempts to re-fetch the
521 same record will be deferred until the fetch completes.
523 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
525 uint32_t *k;
526 struct ctdb_deferred_fetch_queue *dfq;
528 k = ctdb_key_to_idkey(call, call->key);
529 if (k == NULL) {
530 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
531 return -1;
534 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
535 if (dfq == NULL) {
536 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
537 talloc_free(k);
538 return -1;
540 dfq->deferred_calls = NULL;
542 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
544 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
546 /* if the fetch havent completed in 30 seconds, just tear it all down
547 and let it try again as the events are reissued */
548 tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
549 dfq_timeout, dfq);
551 talloc_free(k);
552 return 0;
555 /* check if this is a duplicate request to a fetch already in-flight
556 if it is, make this call deferred to be reprocessed later when
557 the in-flight fetch completes.
559 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
561 uint32_t *k;
562 struct ctdb_deferred_fetch_queue *dfq;
563 struct ctdb_deferred_fetch_call *dfc;
565 k = ctdb_key_to_idkey(c, key);
566 if (k == NULL) {
567 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
568 return -1;
571 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
572 if (dfq == NULL) {
573 talloc_free(k);
574 return -1;
578 talloc_free(k);
580 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
581 if (dfc == NULL) {
582 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
583 return -1;
586 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
587 if (dfc->w == NULL) {
588 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
589 talloc_free(dfc);
590 return -1;
593 dfc->c = talloc_steal(dfc, c);
594 dfc->w->ctdb = ctdb_db->ctdb;
595 dfc->w->client_id = client->client_id;
597 DLIST_ADD_END(dfq->deferred_calls, dfc);
599 return 0;
604 this is called when the ctdb daemon received a ctdb request call
605 from a local client over the unix domain socket
607 static void daemon_request_call_from_client(struct ctdb_client *client,
608 struct ctdb_req_call_old *c)
610 struct ctdb_call_state *state;
611 struct ctdb_db_context *ctdb_db;
612 struct daemon_call_state *dstate;
613 struct ctdb_call *call;
614 struct ctdb_ltdb_header header;
615 TDB_DATA key, data;
616 int ret;
617 struct ctdb_context *ctdb = client->ctdb;
618 struct ctdb_daemon_packet_wrap *w;
620 CTDB_INCREMENT_STAT(ctdb, total_calls);
621 CTDB_INCREMENT_STAT(ctdb, pending_calls);
623 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
624 if (!ctdb_db) {
625 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
626 c->db_id));
627 CTDB_DECREMENT_STAT(ctdb, pending_calls);
628 return;
631 if (ctdb_db->unhealthy_reason) {
633 * this is just a warning, as the tdb should be empty anyway,
634 * and only persistent databases can be unhealthy, which doesn't
635 * use this code patch
637 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
638 ctdb_db->db_name, ctdb_db->unhealthy_reason));
641 key.dptr = c->data;
642 key.dsize = c->keylen;
644 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
645 CTDB_NO_MEMORY_VOID(ctdb, w);
647 w->ctdb = ctdb;
648 w->client_id = client->client_id;
650 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
651 (struct ctdb_req_header *)c, &data,
652 daemon_incoming_packet_wrap, w, true);
653 if (ret == -2) {
654 /* will retry later */
655 CTDB_DECREMENT_STAT(ctdb, pending_calls);
656 return;
659 talloc_free(w);
661 if (ret != 0) {
662 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
663 CTDB_DECREMENT_STAT(ctdb, pending_calls);
664 return;
668 /* check if this fetch request is a duplicate for a
669 request we already have in flight. If so defer it until
670 the first request completes.
672 if (ctdb->tunable.fetch_collapse == 1) {
673 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
674 ret = ctdb_ltdb_unlock(ctdb_db, key);
675 if (ret != 0) {
676 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
678 CTDB_DECREMENT_STAT(ctdb, pending_calls);
679 return;
683 /* Dont do READONLY if we don't have a tracking database */
684 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
685 c->flags &= ~CTDB_WANT_READONLY;
688 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
689 header.flags &= ~CTDB_REC_RO_FLAGS;
690 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
691 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
692 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
693 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
695 /* and clear out the tracking data */
696 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
697 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
701 /* if we are revoking, we must defer all other calls until the revoke
702 * had completed.
704 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
705 talloc_free(data.dptr);
706 ret = ctdb_ltdb_unlock(ctdb_db, key);
708 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
709 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
711 CTDB_DECREMENT_STAT(ctdb, pending_calls);
712 return;
715 if ((header.dmaster == ctdb->pnn)
716 && (!(c->flags & CTDB_WANT_READONLY))
717 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
718 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
719 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
720 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
722 ret = ctdb_ltdb_unlock(ctdb_db, key);
724 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
725 ctdb_fatal(ctdb, "Failed to start record revoke");
727 talloc_free(data.dptr);
729 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
730 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
733 CTDB_DECREMENT_STAT(ctdb, pending_calls);
734 return;
737 dstate = talloc(client, struct daemon_call_state);
738 if (dstate == NULL) {
739 ret = ctdb_ltdb_unlock(ctdb_db, key);
740 if (ret != 0) {
741 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
744 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
745 CTDB_DECREMENT_STAT(ctdb, pending_calls);
746 return;
748 dstate->start_time = timeval_current();
749 dstate->client = client;
750 dstate->reqid = c->hdr.reqid;
751 talloc_steal(dstate, data.dptr);
753 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
754 if (call == NULL) {
755 ret = ctdb_ltdb_unlock(ctdb_db, key);
756 if (ret != 0) {
757 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
760 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
761 CTDB_DECREMENT_STAT(ctdb, pending_calls);
762 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
763 return;
766 dstate->readonly_fetch = 0;
767 call->call_id = c->callid;
768 call->key = key;
769 call->call_data.dptr = c->data + c->keylen;
770 call->call_data.dsize = c->calldatalen;
771 call->flags = c->flags;
773 if (c->flags & CTDB_WANT_READONLY) {
774 /* client wants readonly record, so translate this into a
775 fetch with header. remember what the client asked for
776 so we can remap the reply back to the proper format for
777 the client in the reply
779 dstate->client_callid = call->call_id;
780 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
781 dstate->readonly_fetch = 1;
784 if (header.dmaster == ctdb->pnn) {
785 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
786 } else {
787 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
788 if (ctdb->tunable.fetch_collapse == 1) {
789 /* This request triggered a remote fetch-lock.
790 set up a deferral for this key so any additional
791 fetch-locks are deferred until the current one
792 finishes.
794 setup_deferred_fetch_locks(ctdb_db, call);
798 ret = ctdb_ltdb_unlock(ctdb_db, key);
799 if (ret != 0) {
800 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
803 if (state == NULL) {
804 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
805 CTDB_DECREMENT_STAT(ctdb, pending_calls);
806 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
807 return;
809 talloc_steal(state, dstate);
810 talloc_steal(client, state);
812 state->async.fn = daemon_call_from_client_callback;
813 state->async.private_data = dstate;
817 static void daemon_request_control_from_client(struct ctdb_client *client,
818 struct ctdb_req_control_old *c);
820 /* data contains a packet from the client */
821 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
823 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
824 TALLOC_CTX *tmp_ctx;
825 struct ctdb_context *ctdb = client->ctdb;
827 /* place the packet as a child of a tmp_ctx. We then use
828 talloc_free() below to free it. If any of the calls want
829 to keep it, then they will steal it somewhere else, and the
830 talloc_free() will be a no-op */
831 tmp_ctx = talloc_new(client);
832 talloc_steal(tmp_ctx, hdr);
834 if (hdr->ctdb_magic != CTDB_MAGIC) {
835 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
836 goto done;
839 if (hdr->ctdb_version != CTDB_PROTOCOL) {
840 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
841 goto done;
844 switch (hdr->operation) {
845 case CTDB_REQ_CALL:
846 CTDB_INCREMENT_STAT(ctdb, client.req_call);
847 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
848 break;
850 case CTDB_REQ_MESSAGE:
851 CTDB_INCREMENT_STAT(ctdb, client.req_message);
852 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
853 break;
855 case CTDB_REQ_CONTROL:
856 CTDB_INCREMENT_STAT(ctdb, client.req_control);
857 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
858 break;
860 default:
861 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
862 hdr->operation));
865 done:
866 talloc_free(tmp_ctx);
870 called when the daemon gets a incoming packet
872 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
874 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
875 struct ctdb_req_header *hdr;
877 if (cnt == 0) {
878 talloc_free(client);
879 return;
882 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
884 if (cnt < sizeof(*hdr)) {
885 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
886 (unsigned)cnt);
887 return;
889 hdr = (struct ctdb_req_header *)data;
890 if (cnt != hdr->length) {
891 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
892 (unsigned)hdr->length, (unsigned)cnt);
893 return;
896 if (hdr->ctdb_magic != CTDB_MAGIC) {
897 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
898 return;
901 if (hdr->ctdb_version != CTDB_PROTOCOL) {
902 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
903 return;
906 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
907 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
908 hdr->srcnode, hdr->destnode));
910 /* it is the responsibility of the incoming packet function to free 'data' */
911 daemon_incoming_packet(client, hdr);
915 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
917 if (client_pid->ctdb->client_pids != NULL) {
918 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
921 return 0;
925 static void ctdb_accept_client(struct tevent_context *ev,
926 struct tevent_fd *fde, uint16_t flags,
927 void *private_data)
929 struct sockaddr_un addr;
930 socklen_t len;
931 int fd;
932 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
933 struct ctdb_client *client;
934 struct ctdb_client_pid_list *client_pid;
935 pid_t peer_pid = 0;
936 int ret;
938 memset(&addr, 0, sizeof(addr));
939 len = sizeof(addr);
940 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
941 if (fd == -1) {
942 return;
945 ret = set_blocking(fd, false);
946 if (ret != 0) {
947 DEBUG(DEBUG_ERR,
948 (__location__
949 " failed to set socket non-blocking (%s)\n",
950 strerror(errno)));
951 close(fd);
952 return;
955 set_close_on_exec(fd);
957 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
959 client = talloc_zero(ctdb, struct ctdb_client);
960 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
961 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
964 client->ctdb = ctdb;
965 client->fd = fd;
966 client->client_id = reqid_new(ctdb->idr, client);
967 client->pid = peer_pid;
969 client_pid = talloc(client, struct ctdb_client_pid_list);
970 if (client_pid == NULL) {
971 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
972 close(fd);
973 talloc_free(client);
974 return;
976 client_pid->ctdb = ctdb;
977 client_pid->pid = peer_pid;
978 client_pid->client = client;
980 DLIST_ADD(ctdb->client_pids, client_pid);
982 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
983 ctdb_daemon_read_cb, client,
984 "client-%u", client->pid);
986 talloc_set_destructor(client, ctdb_client_destructor);
987 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
988 ctdb->num_clients++;
994 create a unix domain socket and bind it
995 return a file descriptor open on the socket
997 static int ux_socket_bind(struct ctdb_context *ctdb)
999 struct sockaddr_un addr;
1000 int ret;
1002 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1003 if (ctdb->daemon.sd == -1) {
1004 return -1;
1007 memset(&addr, 0, sizeof(addr));
1008 addr.sun_family = AF_UNIX;
1009 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1011 if (! sock_clean(ctdb->daemon.name)) {
1012 return -1;
1015 set_close_on_exec(ctdb->daemon.sd);
1017 ret = set_blocking(ctdb->daemon.sd, false);
1018 if (ret != 0) {
1019 DEBUG(DEBUG_ERR,
1020 (__location__
1021 " failed to set socket non-blocking (%s)\n",
1022 strerror(errno)));
1023 goto failed;
1026 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1027 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1028 goto failed;
1031 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1032 chmod(ctdb->daemon.name, 0700) != 0) {
1033 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1034 goto failed;
1038 if (listen(ctdb->daemon.sd, 100) != 0) {
1039 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1040 goto failed;
1043 DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1044 ctdb->daemon.name));
1045 return 0;
1047 failed:
1048 close(ctdb->daemon.sd);
1049 ctdb->daemon.sd = -1;
1050 return -1;
1053 static void initialise_node_flags (struct ctdb_context *ctdb)
1055 if (ctdb->pnn == -1) {
1056 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1059 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1061 /* do we start out in DISABLED mode? */
1062 if (ctdb->start_as_disabled != 0) {
1063 DEBUG(DEBUG_ERR,
1064 ("This node is configured to start in DISABLED state\n"));
1065 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1067 /* do we start out in STOPPED mode? */
1068 if (ctdb->start_as_stopped != 0) {
1069 DEBUG(DEBUG_ERR,
1070 ("This node is configured to start in STOPPED state\n"));
1071 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1075 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1076 void *private_data)
1078 if (status != 0) {
1079 ctdb_die(ctdb, "Failed to run setup event");
1081 ctdb_run_notification_script(ctdb, "setup");
1083 /* tell all other nodes we've just started up */
1084 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1085 0, CTDB_CONTROL_STARTUP, 0,
1086 CTDB_CTRL_FLAG_NOREPLY,
1087 tdb_null, NULL, NULL);
1089 /* Start the recovery daemon */
1090 if (ctdb_start_recoverd(ctdb) != 0) {
1091 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1092 exit(11);
1095 ctdb_start_periodic_events(ctdb);
1097 ctdb_wait_for_first_recovery(ctdb);
1100 static struct timeval tevent_before_wait_ts;
1101 static struct timeval tevent_after_wait_ts;
1103 static void ctdb_tevent_trace_init(void)
1105 struct timeval now;
1107 now = timeval_current();
1109 tevent_before_wait_ts = now;
1110 tevent_after_wait_ts = now;
1113 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1114 void *private_data)
1116 struct timeval diff;
1117 struct timeval now;
1118 struct ctdb_context *ctdb =
1119 talloc_get_type(private_data, struct ctdb_context);
1121 if (getpid() != ctdb->ctdbd_pid) {
1122 return;
1125 now = timeval_current();
1127 switch (tp) {
1128 case TEVENT_TRACE_BEFORE_WAIT:
1129 diff = timeval_until(&tevent_after_wait_ts, &now);
1130 if (diff.tv_sec > 3) {
1131 DEBUG(DEBUG_ERR,
1132 ("Handling event took %ld seconds!\n",
1133 (long)diff.tv_sec));
1135 tevent_before_wait_ts = now;
1136 break;
1138 case TEVENT_TRACE_AFTER_WAIT:
1139 diff = timeval_until(&tevent_before_wait_ts, &now);
1140 if (diff.tv_sec > 3) {
1141 DEBUG(DEBUG_ERR,
1142 ("No event for %ld seconds!\n",
1143 (long)diff.tv_sec));
1145 tevent_after_wait_ts = now;
1146 break;
1148 default:
1149 /* Do nothing for future tevent trace points */ ;
1153 static void ctdb_remove_pidfile(void)
1155 TALLOC_FREE(ctdbd_pidfile_ctx);
1158 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1160 if (ctdbd_pidfile != NULL) {
1161 int ret = pidfile_create(mem_ctx, ctdbd_pidfile,
1162 &ctdbd_pidfile_ctx);
1163 if (ret != 0) {
1164 DEBUG(DEBUG_ERR,
1165 ("Failed to create PID file %s\n",
1166 ctdbd_pidfile));
1167 exit(11);
1170 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1171 atexit(ctdb_remove_pidfile);
1175 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1177 int i, j, count;
1179 /* initialize the vnn mapping table, skipping any deleted nodes */
1180 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1181 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1183 count = 0;
1184 for (i = 0; i < ctdb->num_nodes; i++) {
1185 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1186 count++;
1190 ctdb->vnn_map->generation = INVALID_GENERATION;
1191 ctdb->vnn_map->size = count;
1192 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1193 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1195 for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1196 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1197 continue;
1199 ctdb->vnn_map->map[j] = i;
1200 j++;
1204 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1206 int nodeid;
1208 if (ctdb->address == NULL) {
1209 ctdb_fatal(ctdb,
1210 "Can not determine PNN - node address is not set\n");
1213 nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1214 if (nodeid == -1) {
1215 ctdb_fatal(ctdb,
1216 "Can not determine PNN - node address not found in node list\n");
1219 ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1220 DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1224 start the protocol going as a daemon
1226 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1228 int res, ret = -1;
1229 struct tevent_fd *fde;
1231 if (do_fork && fork()) {
1232 return 0;
1235 if (do_fork) {
1236 if (setsid() == -1) {
1237 ctdb_die(ctdb, "Failed to setsid()\n");
1239 close(0);
1240 if (open("/dev/null", O_RDONLY) != 0) {
1241 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1242 exit(11);
1245 ignore_signal(SIGPIPE);
1246 ignore_signal(SIGUSR1);
1248 ctdb->ctdbd_pid = getpid();
1249 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1250 CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1251 ctdb_create_pidfile(ctdb);
1253 /* create a unix domain stream socket to listen to */
1254 res = ux_socket_bind(ctdb);
1255 if (res!=0) {
1256 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1257 exit(10);
1260 /* Make sure we log something when the daemon terminates.
1261 * This must be the first exit handler to run (so the last to
1262 * be registered.
1264 __ctdbd_pid = getpid();
1265 atexit(print_exit_message);
1267 if (ctdb->do_setsched) {
1268 /* try to set us up as realtime */
1269 if (!set_scheduler()) {
1270 exit(1);
1272 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1275 ctdb->ev = tevent_context_init(NULL);
1276 if (ctdb->ev == NULL) {
1277 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1278 exit(1);
1280 tevent_loop_allow_nesting(ctdb->ev);
1281 ctdb_tevent_trace_init();
1282 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1284 /* set up a handler to pick up sigchld */
1285 if (ctdb_init_sigchld(ctdb) == NULL) {
1286 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1287 exit(1);
1290 if (do_fork) {
1291 ctdb_set_child_logging(ctdb);
1294 TALLOC_FREE(ctdb->srv);
1295 if (srvid_init(ctdb, &ctdb->srv) != 0) {
1296 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1297 exit(1);
1300 /* initialize statistics collection */
1301 ctdb_statistics_init(ctdb);
1303 /* force initial recovery for election */
1304 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1306 if (ctdb_start_eventd(ctdb) != 0) {
1307 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1308 exit(1);
1311 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1312 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1313 if (ret != 0) {
1314 ctdb_die(ctdb, "Failed to run init event\n");
1316 ctdb_run_notification_script(ctdb, "init");
1318 if (strcmp(ctdb->transport, "tcp") == 0) {
1319 ret = ctdb_tcp_init(ctdb);
1321 #ifdef USE_INFINIBAND
1322 if (strcmp(ctdb->transport, "ib") == 0) {
1323 ret = ctdb_ibw_init(ctdb);
1325 #endif
1326 if (ret != 0) {
1327 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1328 return -1;
1331 if (ctdb->methods == NULL) {
1332 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1333 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1336 /* Initialise the transport. This sets the node address if it
1337 * was not set via the command-line. */
1338 if (ctdb->methods->initialise(ctdb) != 0) {
1339 ctdb_fatal(ctdb, "transport failed to initialise");
1342 ctdb_set_my_pnn(ctdb);
1344 initialise_node_flags(ctdb);
1346 if (ctdb->public_addresses_file) {
1347 ret = ctdb_set_public_addresses(ctdb, true);
1348 if (ret == -1) {
1349 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1350 exit(1);
1354 ctdb_initialise_vnn_map(ctdb);
1356 /* attach to existing databases */
1357 if (ctdb_attach_databases(ctdb) != 0) {
1358 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1361 /* start frozen, then let the first election sort things out */
1362 if (!ctdb_blocking_freeze(ctdb)) {
1363 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1366 /* now start accepting clients, only can do this once frozen */
1367 fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1368 ctdb_accept_client, ctdb);
1369 if (fde == NULL) {
1370 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1372 tevent_fd_set_auto_close(fde);
1374 /* Start the transport */
1375 if (ctdb->methods->start(ctdb) != 0) {
1376 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1377 ctdb_fatal(ctdb, "transport failed to start");
1380 /* Recovery daemon and timed events are started from the
1381 * callback, only after the setup event completes
1382 * successfully.
1384 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1385 ret = ctdb_event_script_callback(ctdb,
1386 ctdb,
1387 ctdb_setup_event_callback,
1388 ctdb,
1389 CTDB_EVENT_SETUP,
1390 "%s",
1391 "");
1392 if (ret != 0) {
1393 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1394 exit(1);
1397 lockdown_memory(ctdb->valgrinding);
1399 /* go into a wait loop to allow other nodes to complete */
1400 tevent_loop_wait(ctdb->ev);
1402 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1403 exit(1);
1407 allocate a packet for use in daemon<->daemon communication
1409 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1410 TALLOC_CTX *mem_ctx,
1411 enum ctdb_operation operation,
1412 size_t length, size_t slength,
1413 const char *type)
1415 int size;
1416 struct ctdb_req_header *hdr;
1418 length = MAX(length, slength);
1419 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1421 if (ctdb->methods == NULL) {
1422 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1423 operation, (unsigned)length));
1424 return NULL;
1427 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1428 if (hdr == NULL) {
1429 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1430 operation, (unsigned)length));
1431 return NULL;
1433 talloc_set_name_const(hdr, type);
1434 memset(hdr, 0, slength);
1435 hdr->length = length;
1436 hdr->operation = operation;
1437 hdr->ctdb_magic = CTDB_MAGIC;
1438 hdr->ctdb_version = CTDB_PROTOCOL;
1439 hdr->generation = ctdb->vnn_map->generation;
1440 hdr->srcnode = ctdb->pnn;
1442 return hdr;
1445 struct daemon_control_state {
1446 struct daemon_control_state *next, *prev;
1447 struct ctdb_client *client;
1448 struct ctdb_req_control_old *c;
1449 uint32_t reqid;
1450 struct ctdb_node *node;
1454 callback when a control reply comes in
1456 static void daemon_control_callback(struct ctdb_context *ctdb,
1457 int32_t status, TDB_DATA data,
1458 const char *errormsg,
1459 void *private_data)
1461 struct daemon_control_state *state = talloc_get_type(private_data,
1462 struct daemon_control_state);
1463 struct ctdb_client *client = state->client;
1464 struct ctdb_reply_control_old *r;
1465 size_t len;
1466 int ret;
1468 /* construct a message to send to the client containing the data */
1469 len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1470 if (errormsg) {
1471 len += strlen(errormsg);
1473 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1474 struct ctdb_reply_control_old);
1475 CTDB_NO_MEMORY_VOID(ctdb, r);
1477 r->hdr.reqid = state->reqid;
1478 r->status = status;
1479 r->datalen = data.dsize;
1480 r->errorlen = 0;
1481 memcpy(&r->data[0], data.dptr, data.dsize);
1482 if (errormsg) {
1483 r->errorlen = strlen(errormsg);
1484 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1487 ret = daemon_queue_send(client, &r->hdr);
1488 if (ret != -1) {
1489 talloc_free(state);
1494 fail all pending controls to a disconnected node
1496 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1498 struct daemon_control_state *state;
1499 while ((state = node->pending_controls)) {
1500 DLIST_REMOVE(node->pending_controls, state);
1501 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1502 "node is disconnected", state);
1507 destroy a daemon_control_state
1509 static int daemon_control_destructor(struct daemon_control_state *state)
1511 if (state->node) {
1512 DLIST_REMOVE(state->node->pending_controls, state);
1514 return 0;
1518 this is called when the ctdb daemon received a ctdb request control
1519 from a local client over the unix domain socket
1521 static void daemon_request_control_from_client(struct ctdb_client *client,
1522 struct ctdb_req_control_old *c)
1524 TDB_DATA data;
1525 int res;
1526 struct daemon_control_state *state;
1527 TALLOC_CTX *tmp_ctx = talloc_new(client);
1529 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1530 c->hdr.destnode = client->ctdb->pnn;
1533 state = talloc(client, struct daemon_control_state);
1534 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1536 state->client = client;
1537 state->c = talloc_steal(state, c);
1538 state->reqid = c->hdr.reqid;
1539 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1540 state->node = client->ctdb->nodes[c->hdr.destnode];
1541 DLIST_ADD(state->node->pending_controls, state);
1542 } else {
1543 state->node = NULL;
1546 talloc_set_destructor(state, daemon_control_destructor);
1548 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1549 talloc_steal(tmp_ctx, state);
1552 data.dptr = &c->data[0];
1553 data.dsize = c->datalen;
1554 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1555 c->srvid, c->opcode, client->client_id,
1556 c->flags,
1557 data, daemon_control_callback,
1558 state);
1559 if (res != 0) {
1560 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1561 c->hdr.destnode));
1564 talloc_free(tmp_ctx);
1568 register a call function
1570 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1571 ctdb_fn_t fn, int id)
1573 struct ctdb_registered_call *call;
1574 struct ctdb_db_context *ctdb_db;
1576 ctdb_db = find_ctdb_db(ctdb, db_id);
1577 if (ctdb_db == NULL) {
1578 return -1;
1581 call = talloc(ctdb_db, struct ctdb_registered_call);
1582 call->fn = fn;
1583 call->id = id;
1585 DLIST_ADD(ctdb_db->calls, call);
1586 return 0;
1592 this local messaging handler is ugly, but is needed to prevent
1593 recursion in ctdb_send_message() when the destination node is the
1594 same as the source node
1596 struct ctdb_local_message {
1597 struct ctdb_context *ctdb;
1598 uint64_t srvid;
1599 TDB_DATA data;
1602 static void ctdb_local_message_trigger(struct tevent_context *ev,
1603 struct tevent_timer *te,
1604 struct timeval t, void *private_data)
1606 struct ctdb_local_message *m = talloc_get_type(
1607 private_data, struct ctdb_local_message);
1609 srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1610 talloc_free(m);
1613 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1615 struct ctdb_local_message *m;
1616 m = talloc(ctdb, struct ctdb_local_message);
1617 CTDB_NO_MEMORY(ctdb, m);
1619 m->ctdb = ctdb;
1620 m->srvid = srvid;
1621 m->data = data;
1622 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1623 if (m->data.dptr == NULL) {
1624 talloc_free(m);
1625 return -1;
1628 /* this needs to be done as an event to prevent recursion */
1629 tevent_add_timer(ctdb->ev, m, timeval_zero(),
1630 ctdb_local_message_trigger, m);
1631 return 0;
1635 send a ctdb message
1637 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1638 uint64_t srvid, TDB_DATA data)
1640 struct ctdb_req_message_old *r;
1641 int len;
1643 if (ctdb->methods == NULL) {
1644 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1645 return -1;
1648 /* see if this is a message to ourselves */
1649 if (pnn == ctdb->pnn) {
1650 return ctdb_local_message(ctdb, srvid, data);
1653 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1654 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1655 struct ctdb_req_message_old);
1656 CTDB_NO_MEMORY(ctdb, r);
1658 r->hdr.destnode = pnn;
1659 r->srvid = srvid;
1660 r->datalen = data.dsize;
1661 memcpy(&r->data[0], data.dptr, data.dsize);
1663 ctdb_queue_packet(ctdb, &r->hdr);
1665 talloc_free(r);
1666 return 0;
1671 struct ctdb_client_notify_list {
1672 struct ctdb_client_notify_list *next, *prev;
1673 struct ctdb_context *ctdb;
1674 uint64_t srvid;
1675 TDB_DATA data;
1679 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1681 int ret;
1683 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1685 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1686 if (ret != 0) {
1687 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1690 return 0;
1693 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1695 struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1696 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1697 struct ctdb_client_notify_list *nl;
1699 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1701 if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1702 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1703 return -1;
1706 if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1707 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1708 return -1;
1712 if (client == NULL) {
1713 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1714 return -1;
1717 for(nl=client->notify; nl; nl=nl->next) {
1718 if (nl->srvid == notify->srvid) {
1719 break;
1722 if (nl != NULL) {
1723 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1724 return -1;
1727 nl = talloc(client, struct ctdb_client_notify_list);
1728 CTDB_NO_MEMORY(ctdb, nl);
1729 nl->ctdb = ctdb;
1730 nl->srvid = notify->srvid;
1731 nl->data.dsize = notify->len;
1732 nl->data.dptr = talloc_memdup(nl, notify->notify_data,
1733 nl->data.dsize);
1734 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1736 DLIST_ADD(client->notify, nl);
1737 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1739 return 0;
1742 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1744 uint64_t srvid = *(uint64_t *)indata.dptr;
1745 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1746 struct ctdb_client_notify_list *nl;
1748 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1750 if (client == NULL) {
1751 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1752 return -1;
1755 for(nl=client->notify; nl; nl=nl->next) {
1756 if (nl->srvid == srvid) {
1757 break;
1760 if (nl == NULL) {
1761 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1762 return -1;
1765 DLIST_REMOVE(client->notify, nl);
1766 talloc_set_destructor(nl, NULL);
1767 talloc_free(nl);
1769 return 0;
1772 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1774 struct ctdb_client_pid_list *client_pid;
1776 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1777 if (client_pid->pid == pid) {
1778 return client_pid->client;
1781 return NULL;
1785 /* This control is used by samba when probing if a process (of a samba daemon)
1786 exists on the node.
1787 Samba does this when it needs/wants to check if a subrecord in one of the
1788 databases is still valied, or if it is stale and can be removed.
1789 If the node is in unhealthy or stopped state we just kill of the samba
1790 process holding htis sub-record and return to the calling samba that
1791 the process does not exist.
1792 This allows us to forcefully recall subrecords registered by samba processes
1793 on banned and stopped nodes.
1795 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1797 struct ctdb_client *client;
1799 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1800 client = ctdb_find_client_by_pid(ctdb, pid);
1801 if (client != NULL) {
1802 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1803 talloc_free(client);
1805 return -1;
1808 return kill(pid, 0);
1811 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1813 struct ctdb_node_map_old *node_map = NULL;
1815 CHECK_CONTROL_DATA_SIZE(0);
1817 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1818 if (node_map == NULL) {
1819 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1820 return -1;
1823 outdata->dptr = (unsigned char *)node_map;
1824 outdata->dsize = talloc_get_size(outdata->dptr);
1826 return 0;
1829 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1831 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1832 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1833 return;
1836 DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1837 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1838 ctdb_stop_recoverd(ctdb);
1839 ctdb_stop_keepalive(ctdb);
1840 ctdb_stop_monitoring(ctdb);
1841 ctdb_release_all_ips(ctdb);
1842 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1843 ctdb_stop_eventd(ctdb);
1844 if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1845 ctdb->methods->shutdown(ctdb);
1848 DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1849 exit(exit_code);
1852 /* When forking the main daemon and the child process needs to connect
1853 * back to the daemon as a client process, this function can be used
1854 * to change the ctdb context from daemon into client mode. The child
1855 * process must be created using ctdb_fork() and not fork() -
1856 * ctdb_fork() does some necessary housekeeping.
1858 int switch_from_server_to_client(struct ctdb_context *ctdb)
1860 int ret;
1862 /* get a new event context */
1863 ctdb->ev = tevent_context_init(ctdb);
1864 if (ctdb->ev == NULL) {
1865 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1866 exit(1);
1868 tevent_loop_allow_nesting(ctdb->ev);
1870 /* Connect to main CTDB daemon */
1871 ret = ctdb_socket_connect(ctdb);
1872 if (ret != 0) {
1873 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1874 return -1;
1877 ctdb->can_send_controls = true;
1879 return 0;