4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
33 struct ctdb_client_pid_list
{
34 struct ctdb_client_pid_list
*next
, *prev
;
35 struct ctdb_context
*ctdb
;
37 struct ctdb_client
*client
;
40 const char *ctdbd_pidfile
= NULL
;
42 static void daemon_incoming_packet(void *, struct ctdb_req_header
*);
44 static void print_exit_message(void)
46 if (debug_extra
!= NULL
&& debug_extra
[0] != '\0') {
47 DEBUG(DEBUG_NOTICE
,("CTDB %s shutting down\n", debug_extra
));
49 DEBUG(DEBUG_NOTICE
,("CTDB daemon shutting down\n"));
51 /* Wait a second to allow pending log messages to be flushed */
58 static void ctdb_time_tick(struct event_context
*ev
, struct timed_event
*te
,
59 struct timeval t
, void *private_data
)
61 struct ctdb_context
*ctdb
= talloc_get_type(private_data
, struct ctdb_context
);
63 if (getpid() != ctdbd_pid
) {
67 event_add_timed(ctdb
->ev
, ctdb
,
68 timeval_current_ofs(1, 0),
69 ctdb_time_tick
, ctdb
);
72 /* Used to trigger a dummy event once per second, to make
73 * detection of hangs more reliable.
75 static void ctdb_start_time_tickd(struct ctdb_context
*ctdb
)
77 event_add_timed(ctdb
->ev
, ctdb
,
78 timeval_current_ofs(1, 0),
79 ctdb_time_tick
, ctdb
);
82 static void ctdb_start_periodic_events(struct ctdb_context
*ctdb
)
84 /* start monitoring for connected/disconnected nodes */
85 ctdb_start_keepalive(ctdb
);
87 /* start monitoring for node health */
88 ctdb_start_monitoring(ctdb
);
90 /* start periodic update of tcp tickle lists */
91 ctdb_start_tcp_tickle_update(ctdb
);
93 /* start listening for recovery daemon pings */
94 ctdb_control_recd_ping(ctdb
);
96 /* start listening to timer ticks */
97 ctdb_start_time_tickd(ctdb
);
100 static void block_signal(int signum
)
102 struct sigaction act
;
104 memset(&act
, 0, sizeof(act
));
106 act
.sa_handler
= SIG_IGN
;
107 sigemptyset(&act
.sa_mask
);
108 sigaddset(&act
.sa_mask
, signum
);
109 sigaction(signum
, &act
, NULL
);
114 send a packet to a client
116 static int daemon_queue_send(struct ctdb_client
*client
, struct ctdb_req_header
*hdr
)
118 CTDB_INCREMENT_STAT(client
->ctdb
, client_packets_sent
);
119 if (hdr
->operation
== CTDB_REQ_MESSAGE
) {
120 if (ctdb_queue_length(client
->queue
) > client
->ctdb
->tunable
.max_queue_depth_drop_msg
) {
121 DEBUG(DEBUG_ERR
,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
126 return ctdb_queue_send(client
->queue
, (uint8_t *)hdr
, hdr
->length
);
130 message handler for when we are in daemon mode. This redirects the message
133 static void daemon_message_handler(struct ctdb_context
*ctdb
, uint64_t srvid
,
134 TDB_DATA data
, void *private_data
)
136 struct ctdb_client
*client
= talloc_get_type(private_data
, struct ctdb_client
);
137 struct ctdb_req_message
*r
;
140 /* construct a message to send to the client containing the data */
141 len
= offsetof(struct ctdb_req_message
, data
) + data
.dsize
;
142 r
= ctdbd_allocate_pkt(ctdb
, ctdb
, CTDB_REQ_MESSAGE
,
143 len
, struct ctdb_req_message
);
144 CTDB_NO_MEMORY_VOID(ctdb
, r
);
146 talloc_set_name_const(r
, "req_message packet");
149 r
->datalen
= data
.dsize
;
150 memcpy(&r
->data
[0], data
.dptr
, data
.dsize
);
152 daemon_queue_send(client
, &r
->hdr
);
158 this is called when the ctdb daemon received a ctdb request to
159 set the srvid from the client
161 int daemon_register_message_handler(struct ctdb_context
*ctdb
, uint32_t client_id
, uint64_t srvid
)
163 struct ctdb_client
*client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
165 if (client
== NULL
) {
166 DEBUG(DEBUG_ERR
,("Bad client_id in daemon_request_register_message_handler\n"));
169 res
= ctdb_register_message_handler(ctdb
, client
, srvid
, daemon_message_handler
, client
);
171 DEBUG(DEBUG_ERR
,(__location__
" Failed to register handler %llu in daemon\n",
172 (unsigned long long)srvid
));
174 DEBUG(DEBUG_INFO
,(__location__
" Registered message handler for srvid=%llu\n",
175 (unsigned long long)srvid
));
182 this is called when the ctdb daemon received a ctdb request to
183 remove a srvid from the client
185 int daemon_deregister_message_handler(struct ctdb_context
*ctdb
, uint32_t client_id
, uint64_t srvid
)
187 struct ctdb_client
*client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
188 if (client
== NULL
) {
189 DEBUG(DEBUG_ERR
,("Bad client_id in daemon_request_deregister_message_handler\n"));
192 return ctdb_deregister_message_handler(ctdb
, srvid
, client
);
195 int daemon_check_srvids(struct ctdb_context
*ctdb
, TDB_DATA indata
,
202 if ((indata
.dsize
% sizeof(uint64_t)) != 0) {
203 DEBUG(DEBUG_ERR
, ("Bad indata in daemon_check_srvids, "
204 "size=%d\n", (int)indata
.dsize
));
208 ids
= (uint64_t *)indata
.dptr
;
209 num_ids
= indata
.dsize
/ 8;
211 results
= talloc_zero_array(outdata
, uint8_t, (num_ids
+7)/8);
212 if (results
== NULL
) {
213 DEBUG(DEBUG_ERR
, ("talloc failed in daemon_check_srvids\n"));
216 for (i
=0; i
<num_ids
; i
++) {
217 if (ctdb_check_message_handler(ctdb
, ids
[i
])) {
218 results
[i
/8] |= (1 << (i
%8));
221 outdata
->dptr
= (uint8_t *)results
;
222 outdata
->dsize
= talloc_get_size(results
);
227 destroy a ctdb_client
229 static int ctdb_client_destructor(struct ctdb_client
*client
)
231 struct ctdb_db_context
*ctdb_db
;
233 ctdb_takeover_client_destructor_hook(client
);
234 ctdb_reqid_remove(client
->ctdb
, client
->client_id
);
235 client
->ctdb
->num_clients
--;
237 if (client
->num_persistent_updates
!= 0) {
238 DEBUG(DEBUG_ERR
,(__location__
" Client disconnecting with %u persistent updates in flight. Starting recovery\n", client
->num_persistent_updates
));
239 client
->ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
241 ctdb_db
= find_ctdb_db(client
->ctdb
, client
->db_id
);
243 DEBUG(DEBUG_ERR
, (__location__
" client exit while transaction "
244 "commit active. Forcing recovery.\n"));
245 client
->ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
247 /* legacy trans2 transaction state: */
248 ctdb_db
->transaction_active
= false;
251 * trans3 transaction state:
253 * The destructor sets the pointer to NULL.
255 talloc_free(ctdb_db
->persistent_state
);
263 this is called when the ctdb daemon received a ctdb request message
264 from a local client over the unix domain socket
266 static void daemon_request_message_from_client(struct ctdb_client
*client
,
267 struct ctdb_req_message
*c
)
272 if (c
->hdr
.destnode
== CTDB_CURRENT_NODE
) {
273 c
->hdr
.destnode
= ctdb_get_pnn(client
->ctdb
);
276 /* maybe the message is for another client on this node */
277 if (ctdb_get_pnn(client
->ctdb
)==c
->hdr
.destnode
) {
278 ctdb_request_message(client
->ctdb
, (struct ctdb_req_header
*)c
);
282 /* its for a remote node */
283 data
.dptr
= &c
->data
[0];
284 data
.dsize
= c
->datalen
;
285 res
= ctdb_daemon_send_message(client
->ctdb
, c
->hdr
.destnode
,
288 DEBUG(DEBUG_ERR
,(__location__
" Failed to send message to remote node %u\n",
294 struct daemon_call_state
{
295 struct ctdb_client
*client
;
297 struct ctdb_call
*call
;
298 struct timeval start_time
;
300 /* readonly request ? */
301 uint32_t readonly_fetch
;
302 uint32_t client_callid
;
306 complete a call from a client
308 static void daemon_call_from_client_callback(struct ctdb_call_state
*state
)
310 struct daemon_call_state
*dstate
= talloc_get_type(state
->async
.private_data
,
311 struct daemon_call_state
);
312 struct ctdb_reply_call
*r
;
315 struct ctdb_client
*client
= dstate
->client
;
316 struct ctdb_db_context
*ctdb_db
= state
->ctdb_db
;
318 talloc_steal(client
, dstate
);
319 talloc_steal(dstate
, dstate
->call
);
321 res
= ctdb_daemon_call_recv(state
, dstate
->call
);
323 DEBUG(DEBUG_ERR
, (__location__
" ctdbd_call_recv() returned error\n"));
324 CTDB_DECREMENT_STAT(client
->ctdb
, pending_calls
);
326 CTDB_UPDATE_LATENCY(client
->ctdb
, ctdb_db
, "call_from_client_cb 1", call_latency
, dstate
->start_time
);
330 length
= offsetof(struct ctdb_reply_call
, data
) + dstate
->call
->reply_data
.dsize
;
331 /* If the client asked for readonly FETCH, we remapped this to
332 FETCH_WITH_HEADER when calling the daemon. So we must
333 strip the extra header off the reply data before passing
334 it back to the client.
336 if (dstate
->readonly_fetch
337 && dstate
->client_callid
== CTDB_FETCH_FUNC
) {
338 length
-= sizeof(struct ctdb_ltdb_header
);
341 r
= ctdbd_allocate_pkt(client
->ctdb
, dstate
, CTDB_REPLY_CALL
,
342 length
, struct ctdb_reply_call
);
344 DEBUG(DEBUG_ERR
, (__location__
" Failed to allocate reply_call in ctdb daemon\n"));
345 CTDB_DECREMENT_STAT(client
->ctdb
, pending_calls
);
346 CTDB_UPDATE_LATENCY(client
->ctdb
, ctdb_db
, "call_from_client_cb 2", call_latency
, dstate
->start_time
);
349 r
->hdr
.reqid
= dstate
->reqid
;
350 r
->status
= dstate
->call
->status
;
352 if (dstate
->readonly_fetch
353 && dstate
->client_callid
== CTDB_FETCH_FUNC
) {
354 /* client only asked for a FETCH so we must strip off
355 the extra ctdb_ltdb header
357 r
->datalen
= dstate
->call
->reply_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
358 memcpy(&r
->data
[0], dstate
->call
->reply_data
.dptr
+ sizeof(struct ctdb_ltdb_header
), r
->datalen
);
360 r
->datalen
= dstate
->call
->reply_data
.dsize
;
361 memcpy(&r
->data
[0], dstate
->call
->reply_data
.dptr
, r
->datalen
);
364 res
= daemon_queue_send(client
, &r
->hdr
);
366 /* client is dead - return immediately */
370 DEBUG(DEBUG_ERR
, (__location__
" Failed to queue packet from daemon to client\n"));
372 CTDB_UPDATE_LATENCY(client
->ctdb
, ctdb_db
, "call_from_client_cb 3", call_latency
, dstate
->start_time
);
373 CTDB_DECREMENT_STAT(client
->ctdb
, pending_calls
);
377 struct ctdb_daemon_packet_wrap
{
378 struct ctdb_context
*ctdb
;
383 a wrapper to catch disconnected clients
385 static void daemon_incoming_packet_wrap(void *p
, struct ctdb_req_header
*hdr
)
387 struct ctdb_client
*client
;
388 struct ctdb_daemon_packet_wrap
*w
= talloc_get_type(p
,
389 struct ctdb_daemon_packet_wrap
);
391 DEBUG(DEBUG_CRIT
,(__location__
" Bad packet type '%s'\n", talloc_get_name(p
)));
395 client
= ctdb_reqid_find(w
->ctdb
, w
->client_id
, struct ctdb_client
);
396 if (client
== NULL
) {
397 DEBUG(DEBUG_ERR
,(__location__
" Packet for disconnected client %u\n",
405 daemon_incoming_packet(client
, hdr
);
408 struct ctdb_deferred_fetch_call
{
409 struct ctdb_deferred_fetch_call
*next
, *prev
;
410 struct ctdb_req_call
*c
;
411 struct ctdb_daemon_packet_wrap
*w
;
414 struct ctdb_deferred_fetch_queue
{
415 struct ctdb_deferred_fetch_call
*deferred_calls
;
418 struct ctdb_deferred_requeue
{
419 struct ctdb_deferred_fetch_call
*dfc
;
420 struct ctdb_client
*client
;
423 /* called from a timer event and starts reprocessing the deferred call.*/
424 static void reprocess_deferred_call(struct event_context
*ev
, struct timed_event
*te
,
425 struct timeval t
, void *private_data
)
427 struct ctdb_deferred_requeue
*dfr
= (struct ctdb_deferred_requeue
*)private_data
;
428 struct ctdb_client
*client
= dfr
->client
;
430 talloc_steal(client
, dfr
->dfc
->c
);
431 daemon_incoming_packet(client
, (struct ctdb_req_header
*)dfr
->dfc
->c
);
435 /* the referral context is destroyed either after a timeout or when the initial
436 fetch-lock has finished.
437 at this stage, immediately start reprocessing the queued up deferred
438 calls so they get reprocessed immediately (and since we are dmaster at
439 this stage, trigger the waiting smbd processes to pick up and aquire the
442 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue
*dfq
)
445 /* need to reprocess the packets from the queue explicitely instead of
446 just using a normal destructor since we want, need, to
447 call the clients in the same oder as the requests queued up
449 while (dfq
->deferred_calls
!= NULL
) {
450 struct ctdb_client
*client
;
451 struct ctdb_deferred_fetch_call
*dfc
= dfq
->deferred_calls
;
452 struct ctdb_deferred_requeue
*dfr
;
454 DLIST_REMOVE(dfq
->deferred_calls
, dfc
);
456 client
= ctdb_reqid_find(dfc
->w
->ctdb
, dfc
->w
->client_id
, struct ctdb_client
);
457 if (client
== NULL
) {
458 DEBUG(DEBUG_ERR
,(__location__
" Packet for disconnected client %u\n",
463 /* process it by pushing it back onto the eventloop */
464 dfr
= talloc(client
, struct ctdb_deferred_requeue
);
466 DEBUG(DEBUG_ERR
,("Failed to allocate deferred fetch requeue structure\n"));
470 dfr
->dfc
= talloc_steal(dfr
, dfc
);
471 dfr
->client
= client
;
473 event_add_timed(dfc
->w
->ctdb
->ev
, client
, timeval_zero(), reprocess_deferred_call
, dfr
);
479 /* insert the new deferral context into the rb tree.
480 there should never be a pre-existing context here, but check for it
481 warn and destroy the previous context if there is already a deferral context
484 static void *insert_dfq_callback(void *parm
, void *data
)
487 DEBUG(DEBUG_ERR
,("Already have DFQ registered. Free old %p and create new %p\n", data
, parm
));
493 /* if the original fetch-lock did not complete within a reasonable time,
494 free the context and context for all deferred requests to cause them to be
495 re-inserted into the event system.
497 static void dfq_timeout(struct event_context
*ev
, struct timed_event
*te
,
498 struct timeval t
, void *private_data
)
500 talloc_free(private_data
);
503 /* This function is used in the local daemon to register a KEY in a database
505 While the remote fetch is in-flight, any futher attempts to re-fetch the
506 same record will be deferred until the fetch completes.
508 static int setup_deferred_fetch_locks(struct ctdb_db_context
*ctdb_db
, struct ctdb_call
*call
)
511 struct ctdb_deferred_fetch_queue
*dfq
;
513 k
= talloc_zero_size(call
, ((call
->key
.dsize
+ 3) & 0xfffffffc) + 4);
515 DEBUG(DEBUG_ERR
,("Failed to allocate key for deferred fetch\n"));
519 k
[0] = (call
->key
.dsize
+ 3) / 4 + 1;
520 memcpy(&k
[1], call
->key
.dptr
, call
->key
.dsize
);
522 dfq
= talloc(call
, struct ctdb_deferred_fetch_queue
);
524 DEBUG(DEBUG_ERR
,("Failed to allocate key for deferred fetch queue structure\n"));
528 dfq
->deferred_calls
= NULL
;
530 trbt_insertarray32_callback(ctdb_db
->deferred_fetch
, k
[0], &k
[0], insert_dfq_callback
, dfq
);
532 talloc_set_destructor(dfq
, deferred_fetch_queue_destructor
);
534 /* if the fetch havent completed in 30 seconds, just tear it all down
535 and let it try again as the events are reissued */
536 event_add_timed(ctdb_db
->ctdb
->ev
, dfq
, timeval_current_ofs(30, 0), dfq_timeout
, dfq
);
542 /* check if this is a duplicate request to a fetch already in-flight
543 if it is, make this call deferred to be reprocessed later when
544 the in-flight fetch completes.
546 static int requeue_duplicate_fetch(struct ctdb_db_context
*ctdb_db
, struct ctdb_client
*client
, TDB_DATA key
, struct ctdb_req_call
*c
)
549 struct ctdb_deferred_fetch_queue
*dfq
;
550 struct ctdb_deferred_fetch_call
*dfc
;
552 k
= talloc_zero_size(c
, ((key
.dsize
+ 3) & 0xfffffffc) + 4);
554 DEBUG(DEBUG_ERR
,("Failed to allocate key for deferred fetch\n"));
558 k
[0] = (key
.dsize
+ 3) / 4 + 1;
559 memcpy(&k
[1], key
.dptr
, key
.dsize
);
561 dfq
= trbt_lookuparray32(ctdb_db
->deferred_fetch
, k
[0], &k
[0]);
570 dfc
= talloc(dfq
, struct ctdb_deferred_fetch_call
);
572 DEBUG(DEBUG_ERR
, ("Failed to allocate deferred fetch call structure\n"));
576 dfc
->w
= talloc(dfc
, struct ctdb_daemon_packet_wrap
);
577 if (dfc
->w
== NULL
) {
578 DEBUG(DEBUG_ERR
,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
583 dfc
->c
= talloc_steal(dfc
, c
);
584 dfc
->w
->ctdb
= ctdb_db
->ctdb
;
585 dfc
->w
->client_id
= client
->client_id
;
587 DLIST_ADD_END(dfq
->deferred_calls
, dfc
, NULL
);
594 this is called when the ctdb daemon received a ctdb request call
595 from a local client over the unix domain socket
597 static void daemon_request_call_from_client(struct ctdb_client
*client
,
598 struct ctdb_req_call
*c
)
600 struct ctdb_call_state
*state
;
601 struct ctdb_db_context
*ctdb_db
;
602 struct daemon_call_state
*dstate
;
603 struct ctdb_call
*call
;
604 struct ctdb_ltdb_header header
;
607 struct ctdb_context
*ctdb
= client
->ctdb
;
608 struct ctdb_daemon_packet_wrap
*w
;
610 CTDB_INCREMENT_STAT(ctdb
, total_calls
);
611 CTDB_DECREMENT_STAT(ctdb
, pending_calls
);
613 ctdb_db
= find_ctdb_db(client
->ctdb
, c
->db_id
);
615 DEBUG(DEBUG_ERR
, (__location__
" Unknown database in request. db_id==0x%08x",
617 CTDB_DECREMENT_STAT(ctdb
, pending_calls
);
621 if (ctdb_db
->unhealthy_reason
) {
623 * this is just a warning, as the tdb should be empty anyway,
624 * and only persistent databases can be unhealthy, which doesn't
625 * use this code patch
627 DEBUG(DEBUG_WARNING
,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
628 ctdb_db
->db_name
, ctdb_db
->unhealthy_reason
));
632 key
.dsize
= c
->keylen
;
634 w
= talloc(ctdb
, struct ctdb_daemon_packet_wrap
);
635 CTDB_NO_MEMORY_VOID(ctdb
, w
);
638 w
->client_id
= client
->client_id
;
640 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, key
, &header
,
641 (struct ctdb_req_header
*)c
, &data
,
642 daemon_incoming_packet_wrap
, w
, true);
644 /* will retry later */
645 CTDB_DECREMENT_STAT(ctdb
, pending_calls
);
652 DEBUG(DEBUG_ERR
,(__location__
" Unable to fetch record\n"));
653 CTDB_DECREMENT_STAT(ctdb
, pending_calls
);
658 /* check if this fetch request is a duplicate for a
659 request we already have in flight. If so defer it until
660 the first request completes.
662 if (ctdb
->tunable
.fetch_collapse
== 1) {
663 if (requeue_duplicate_fetch(ctdb_db
, client
, key
, c
) == 0) {
664 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
666 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
672 /* Dont do READONLY if we dont have a tracking database */
673 if ((c
->flags
& CTDB_WANT_READONLY
) && !ctdb_db
->readonly
) {
674 c
->flags
&= ~CTDB_WANT_READONLY
;
677 if (header
.flags
& CTDB_REC_RO_REVOKE_COMPLETE
) {
678 header
.flags
&= ~CTDB_REC_RO_FLAGS
;
679 CTDB_INCREMENT_STAT(ctdb
, total_ro_revokes
);
680 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_revokes
);
681 if (ctdb_ltdb_store(ctdb_db
, key
, &header
, data
) != 0) {
682 ctdb_fatal(ctdb
, "Failed to write header with cleared REVOKE flag");
684 /* and clear out the tracking data */
685 if (tdb_delete(ctdb_db
->rottdb
, key
) != 0) {
686 DEBUG(DEBUG_ERR
,(__location__
" Failed to clear out trackingdb record\n"));
690 /* if we are revoking, we must defer all other calls until the revoke
693 if (header
.flags
& CTDB_REC_RO_REVOKING_READONLY
) {
694 talloc_free(data
.dptr
);
695 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
697 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, key
, (struct ctdb_req_header
*)c
, daemon_incoming_packet
, client
) != 0) {
698 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
703 if ((header
.dmaster
== ctdb
->pnn
)
704 && (!(c
->flags
& CTDB_WANT_READONLY
))
705 && (header
.flags
& (CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
)) ) {
706 header
.flags
|= CTDB_REC_RO_REVOKING_READONLY
;
707 if (ctdb_ltdb_store(ctdb_db
, key
, &header
, data
) != 0) {
708 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
710 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
712 if (ctdb_start_revoke_ro_record(ctdb
, ctdb_db
, key
, &header
, data
) != 0) {
713 ctdb_fatal(ctdb
, "Failed to start record revoke");
715 talloc_free(data
.dptr
);
717 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, key
, (struct ctdb_req_header
*)c
, daemon_incoming_packet
, client
) != 0) {
718 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
724 dstate
= talloc(client
, struct daemon_call_state
);
725 if (dstate
== NULL
) {
726 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
728 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
731 DEBUG(DEBUG_ERR
,(__location__
" Unable to allocate dstate\n"));
732 CTDB_DECREMENT_STAT(ctdb
, pending_calls
);
735 dstate
->start_time
= timeval_current();
736 dstate
->client
= client
;
737 dstate
->reqid
= c
->hdr
.reqid
;
738 talloc_steal(dstate
, data
.dptr
);
740 call
= dstate
->call
= talloc_zero(dstate
, struct ctdb_call
);
742 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
744 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
747 DEBUG(DEBUG_ERR
,(__location__
" Unable to allocate call\n"));
748 CTDB_DECREMENT_STAT(ctdb
, pending_calls
);
749 CTDB_UPDATE_LATENCY(ctdb
, ctdb_db
, "call_from_client 1", call_latency
, dstate
->start_time
);
753 dstate
->readonly_fetch
= 0;
754 call
->call_id
= c
->callid
;
756 call
->call_data
.dptr
= c
->data
+ c
->keylen
;
757 call
->call_data
.dsize
= c
->calldatalen
;
758 call
->flags
= c
->flags
;
760 if (c
->flags
& CTDB_WANT_READONLY
) {
761 /* client wants readonly record, so translate this into a
762 fetch with header. remember what the client asked for
763 so we can remap the reply back to the proper format for
764 the client in the reply
766 dstate
->client_callid
= call
->call_id
;
767 call
->call_id
= CTDB_FETCH_WITH_HEADER_FUNC
;
768 dstate
->readonly_fetch
= 1;
771 if (header
.dmaster
== ctdb
->pnn
) {
772 state
= ctdb_call_local_send(ctdb_db
, call
, &header
, &data
);
774 state
= ctdb_daemon_call_send_remote(ctdb_db
, call
, &header
);
775 if (ctdb
->tunable
.fetch_collapse
== 1) {
776 /* This request triggered a remote fetch-lock.
777 set up a deferral for this key so any additional
778 fetch-locks are deferred until the current one
781 setup_deferred_fetch_locks(ctdb_db
, call
);
785 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
787 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
791 DEBUG(DEBUG_ERR
,(__location__
" Unable to setup call send\n"));
792 CTDB_DECREMENT_STAT(ctdb
, pending_calls
);
793 CTDB_UPDATE_LATENCY(ctdb
, ctdb_db
, "call_from_client 2", call_latency
, dstate
->start_time
);
796 talloc_steal(state
, dstate
);
797 talloc_steal(client
, state
);
799 state
->async
.fn
= daemon_call_from_client_callback
;
800 state
->async
.private_data
= dstate
;
804 static void daemon_request_control_from_client(struct ctdb_client
*client
,
805 struct ctdb_req_control
*c
);
807 /* data contains a packet from the client */
808 static void daemon_incoming_packet(void *p
, struct ctdb_req_header
*hdr
)
810 struct ctdb_client
*client
= talloc_get_type(p
, struct ctdb_client
);
812 struct ctdb_context
*ctdb
= client
->ctdb
;
814 /* place the packet as a child of a tmp_ctx. We then use
815 talloc_free() below to free it. If any of the calls want
816 to keep it, then they will steal it somewhere else, and the
817 talloc_free() will be a no-op */
818 tmp_ctx
= talloc_new(client
);
819 talloc_steal(tmp_ctx
, hdr
);
821 if (hdr
->ctdb_magic
!= CTDB_MAGIC
) {
822 ctdb_set_error(client
->ctdb
, "Non CTDB packet rejected in daemon\n");
826 if (hdr
->ctdb_version
!= CTDB_VERSION
) {
827 ctdb_set_error(client
->ctdb
, "Bad CTDB version 0x%x rejected in daemon\n", hdr
->ctdb_version
);
831 switch (hdr
->operation
) {
833 CTDB_INCREMENT_STAT(ctdb
, client
.req_call
);
834 daemon_request_call_from_client(client
, (struct ctdb_req_call
*)hdr
);
837 case CTDB_REQ_MESSAGE
:
838 CTDB_INCREMENT_STAT(ctdb
, client
.req_message
);
839 daemon_request_message_from_client(client
, (struct ctdb_req_message
*)hdr
);
842 case CTDB_REQ_CONTROL
:
843 CTDB_INCREMENT_STAT(ctdb
, client
.req_control
);
844 daemon_request_control_from_client(client
, (struct ctdb_req_control
*)hdr
);
848 DEBUG(DEBUG_CRIT
,(__location__
" daemon: unrecognized operation %u\n",
853 talloc_free(tmp_ctx
);
857 called when the daemon gets a incoming packet
859 static void ctdb_daemon_read_cb(uint8_t *data
, size_t cnt
, void *args
)
861 struct ctdb_client
*client
= talloc_get_type(args
, struct ctdb_client
);
862 struct ctdb_req_header
*hdr
;
869 CTDB_INCREMENT_STAT(client
->ctdb
, client_packets_recv
);
871 if (cnt
< sizeof(*hdr
)) {
872 ctdb_set_error(client
->ctdb
, "Bad packet length %u in daemon\n",
876 hdr
= (struct ctdb_req_header
*)data
;
877 if (cnt
!= hdr
->length
) {
878 ctdb_set_error(client
->ctdb
, "Bad header length %u expected %u\n in daemon",
879 (unsigned)hdr
->length
, (unsigned)cnt
);
883 if (hdr
->ctdb_magic
!= CTDB_MAGIC
) {
884 ctdb_set_error(client
->ctdb
, "Non CTDB packet rejected\n");
888 if (hdr
->ctdb_version
!= CTDB_VERSION
) {
889 ctdb_set_error(client
->ctdb
, "Bad CTDB version 0x%x rejected in daemon\n", hdr
->ctdb_version
);
893 DEBUG(DEBUG_DEBUG
,(__location__
" client request %u of type %u length %u from "
894 "node %u to %u\n", hdr
->reqid
, hdr
->operation
, hdr
->length
,
895 hdr
->srcnode
, hdr
->destnode
));
897 /* it is the responsibility of the incoming packet function to free 'data' */
898 daemon_incoming_packet(client
, hdr
);
902 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list
*client_pid
)
904 if (client_pid
->ctdb
->client_pids
!= NULL
) {
905 DLIST_REMOVE(client_pid
->ctdb
->client_pids
, client_pid
);
912 static void ctdb_accept_client(struct event_context
*ev
, struct fd_event
*fde
,
913 uint16_t flags
, void *private_data
)
915 struct sockaddr_un addr
;
918 struct ctdb_context
*ctdb
= talloc_get_type(private_data
, struct ctdb_context
);
919 struct ctdb_client
*client
;
920 struct ctdb_client_pid_list
*client_pid
;
923 memset(&addr
, 0, sizeof(addr
));
925 fd
= accept(ctdb
->daemon
.sd
, (struct sockaddr
*)&addr
, &len
);
931 set_close_on_exec(fd
);
933 DEBUG(DEBUG_DEBUG
,(__location__
" Created SOCKET FD:%d to connected child\n", fd
));
935 client
= talloc_zero(ctdb
, struct ctdb_client
);
936 if (ctdb_get_peer_pid(fd
, &peer_pid
) == 0) {
937 DEBUG(DEBUG_INFO
,("Connected client with pid:%u\n", (unsigned)peer_pid
));
942 client
->client_id
= ctdb_reqid_new(ctdb
, client
);
943 client
->pid
= peer_pid
;
945 client_pid
= talloc(client
, struct ctdb_client_pid_list
);
946 if (client_pid
== NULL
) {
947 DEBUG(DEBUG_ERR
,("Failed to allocate client pid structure\n"));
952 client_pid
->ctdb
= ctdb
;
953 client_pid
->pid
= peer_pid
;
954 client_pid
->client
= client
;
956 DLIST_ADD(ctdb
->client_pids
, client_pid
);
958 client
->queue
= ctdb_queue_setup(ctdb
, client
, fd
, CTDB_DS_ALIGNMENT
,
959 ctdb_daemon_read_cb
, client
,
960 "client-%u", client
->pid
);
962 talloc_set_destructor(client
, ctdb_client_destructor
);
963 talloc_set_destructor(client_pid
, ctdb_clientpid_destructor
);
970 create a unix domain socket and bind it
971 return a file descriptor open on the socket
973 static int ux_socket_bind(struct ctdb_context
*ctdb
)
975 struct sockaddr_un addr
;
977 ctdb
->daemon
.sd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
978 if (ctdb
->daemon
.sd
== -1) {
982 memset(&addr
, 0, sizeof(addr
));
983 addr
.sun_family
= AF_UNIX
;
984 strncpy(addr
.sun_path
, ctdb
->daemon
.name
, sizeof(addr
.sun_path
));
986 /* First check if an old ctdbd might be running */
987 if (connect(ctdb
->daemon
.sd
,
988 (struct sockaddr
*)&addr
, sizeof(addr
)) == 0) {
990 ("Something is already listening on ctdb socket '%s'\n",
995 /* Remove any old socket */
996 unlink(ctdb
->daemon
.name
);
998 set_close_on_exec(ctdb
->daemon
.sd
);
999 set_nonblocking(ctdb
->daemon
.sd
);
1001 if (bind(ctdb
->daemon
.sd
, (struct sockaddr
*)&addr
, sizeof(addr
)) == -1) {
1002 DEBUG(DEBUG_CRIT
,("Unable to bind on ctdb socket '%s'\n", ctdb
->daemon
.name
));
1006 if (chown(ctdb
->daemon
.name
, geteuid(), getegid()) != 0 ||
1007 chmod(ctdb
->daemon
.name
, 0700) != 0) {
1008 DEBUG(DEBUG_CRIT
,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb
->daemon
.name
));
1013 if (listen(ctdb
->daemon
.sd
, 100) != 0) {
1014 DEBUG(DEBUG_CRIT
,("Unable to listen on ctdb socket '%s'\n", ctdb
->daemon
.name
));
1021 close(ctdb
->daemon
.sd
);
1022 ctdb
->daemon
.sd
= -1;
1026 static void initialise_node_flags (struct ctdb_context
*ctdb
)
1028 if (ctdb
->pnn
== -1) {
1029 ctdb_fatal(ctdb
, "PNN is set to -1 (unknown value)");
1032 ctdb
->nodes
[ctdb
->pnn
]->flags
&= ~NODE_FLAGS_DISCONNECTED
;
1034 /* do we start out in DISABLED mode? */
1035 if (ctdb
->start_as_disabled
!= 0) {
1036 DEBUG(DEBUG_INFO
, ("This node is configured to start in DISABLED state\n"));
1037 ctdb
->nodes
[ctdb
->pnn
]->flags
|= NODE_FLAGS_DISABLED
;
1039 /* do we start out in STOPPED mode? */
1040 if (ctdb
->start_as_stopped
!= 0) {
1041 DEBUG(DEBUG_INFO
, ("This node is configured to start in STOPPED state\n"));
1042 ctdb
->nodes
[ctdb
->pnn
]->flags
|= NODE_FLAGS_STOPPED
;
1046 static void ctdb_setup_event_callback(struct ctdb_context
*ctdb
, int status
,
1050 ctdb_die(ctdb
, "Failed to run setup event");
1052 ctdb_run_notification_script(ctdb
, "setup");
1054 ctdb_set_runstate(ctdb
, CTDB_RUNSTATE_FIRST_RECOVERY
);
1056 /* tell all other nodes we've just started up */
1057 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_ALL
,
1058 0, CTDB_CONTROL_STARTUP
, 0,
1059 CTDB_CTRL_FLAG_NOREPLY
,
1060 tdb_null
, NULL
, NULL
);
1062 /* Start the recovery daemon */
1063 if (ctdb_start_recoverd(ctdb
) != 0) {
1064 DEBUG(DEBUG_ALERT
,("Failed to start recovery daemon\n"));
1068 ctdb_start_periodic_events(ctdb
);
1071 static struct timeval tevent_before_wait_ts
;
1072 static struct timeval tevent_after_wait_ts
;
1074 static void ctdb_tevent_trace(enum tevent_trace_point tp
,
1077 struct timeval diff
;
1080 if (getpid() != ctdbd_pid
) {
1084 now
= timeval_current();
1087 case TEVENT_TRACE_BEFORE_WAIT
:
1088 if (!timeval_is_zero(&tevent_after_wait_ts
)) {
1089 diff
= timeval_until(&tevent_after_wait_ts
, &now
);
1090 if (diff
.tv_sec
> 3) {
1092 ("Handling event took %ld seconds!\n",
1096 tevent_before_wait_ts
= now
;
1099 case TEVENT_TRACE_AFTER_WAIT
:
1100 if (!timeval_is_zero(&tevent_before_wait_ts
)) {
1101 diff
= timeval_until(&tevent_before_wait_ts
, &now
);
1102 if (diff
.tv_sec
> 3) {
1104 ("No event for %ld seconds!\n",
1108 tevent_after_wait_ts
= now
;
1112 /* Do nothing for future tevent trace points */ ;
1116 static void ctdb_remove_pidfile(void)
1118 if (ctdbd_pidfile
!= NULL
&& !ctdb_is_child_process()) {
1119 if (unlink(ctdbd_pidfile
) == 0) {
1120 DEBUG(DEBUG_NOTICE
, ("Removed PID file %s\n",
1123 DEBUG(DEBUG_WARNING
, ("Failed to Remove PID file %s\n",
1129 static void ctdb_create_pidfile(pid_t pid
)
1131 if (ctdbd_pidfile
!= NULL
) {
1134 fp
= fopen(ctdbd_pidfile
, "w");
1137 ("Failed to open PID file %s\n", ctdbd_pidfile
));
1141 fprintf(fp
, "%d\n", pid
);
1143 DEBUG(DEBUG_NOTICE
, ("Created PID file %s\n", ctdbd_pidfile
));
1144 atexit(ctdb_remove_pidfile
);
1149 start the protocol going as a daemon
1151 int ctdb_start_daemon(struct ctdb_context
*ctdb
, bool do_fork
, bool use_syslog
, const char *public_address_list
)
1154 struct fd_event
*fde
;
1155 const char *domain_socket_name
;
1157 /* create a unix domain stream socket to listen to */
1158 res
= ux_socket_bind(ctdb
);
1160 DEBUG(DEBUG_ALERT
,("Cannot continue. Exiting!\n"));
1164 if (do_fork
&& fork()) {
1168 tdb_reopen_all(false);
1173 if (open("/dev/null", O_RDONLY
) != 0) {
1174 DEBUG(DEBUG_ALERT
,(__location__
" Failed to setup stdin on /dev/null\n"));
1178 block_signal(SIGPIPE
);
1180 ctdbd_pid
= getpid();
1181 ctdb
->ctdbd_pid
= ctdbd_pid
;
1182 DEBUG(DEBUG_ERR
, ("Starting CTDBD (Version %s) as PID: %u\n",
1183 CTDB_VERSION_STRING
, ctdbd_pid
));
1184 ctdb_create_pidfile(ctdb
->ctdbd_pid
);
1186 /* Make sure we log something when the daemon terminates.
1187 * This must be the first exit handler to run (so the last to
1190 atexit(print_exit_message
);
1192 if (ctdb
->do_setsched
) {
1193 /* try to set us up as realtime */
1194 ctdb_set_scheduler(ctdb
);
1197 /* ensure the socket is deleted on exit of the daemon */
1198 domain_socket_name
= talloc_strdup(talloc_autofree_context(), ctdb
->daemon
.name
);
1199 if (domain_socket_name
== NULL
) {
1200 DEBUG(DEBUG_ALERT
,(__location__
" talloc_strdup failed.\n"));
1204 ctdb
->ev
= event_context_init(NULL
);
1205 tevent_loop_allow_nesting(ctdb
->ev
);
1206 tevent_set_trace_callback(ctdb
->ev
, ctdb_tevent_trace
, NULL
);
1207 ret
= ctdb_init_tevent_logging(ctdb
);
1209 DEBUG(DEBUG_ALERT
,("Failed to initialize TEVENT logging\n"));
1213 /* set up a handler to pick up sigchld */
1214 if (ctdb_init_sigchld(ctdb
) == NULL
) {
1215 DEBUG(DEBUG_CRIT
,("Failed to set up signal handler for SIGCHLD\n"));
1219 ctdb_set_child_logging(ctdb
);
1221 if (start_syslog_daemon(ctdb
)) {
1222 DEBUG(DEBUG_CRIT
, ("Failed to start syslog daemon\n"));
1227 /* initialize statistics collection */
1228 ctdb_statistics_init(ctdb
);
1230 /* force initial recovery for election */
1231 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
1233 ctdb_set_runstate(ctdb
, CTDB_RUNSTATE_INIT
);
1234 ret
= ctdb_event_script(ctdb
, CTDB_EVENT_INIT
);
1236 ctdb_die(ctdb
, "Failed to run init event\n");
1238 ctdb_run_notification_script(ctdb
, "init");
1240 if (strcmp(ctdb
->transport
, "tcp") == 0) {
1241 int ctdb_tcp_init(struct ctdb_context
*);
1242 ret
= ctdb_tcp_init(ctdb
);
1244 #ifdef USE_INFINIBAND
1245 if (strcmp(ctdb
->transport
, "ib") == 0) {
1246 int ctdb_ibw_init(struct ctdb_context
*);
1247 ret
= ctdb_ibw_init(ctdb
);
1251 DEBUG(DEBUG_ERR
,("Failed to initialise transport '%s'\n", ctdb
->transport
));
1255 if (ctdb
->methods
== NULL
) {
1256 DEBUG(DEBUG_ALERT
,(__location__
" Can not initialize transport. ctdb->methods is NULL\n"));
1257 ctdb_fatal(ctdb
, "transport is unavailable. can not initialize.");
1260 /* initialise the transport */
1261 if (ctdb
->methods
->initialise(ctdb
) != 0) {
1262 ctdb_fatal(ctdb
, "transport failed to initialise");
1265 initialise_node_flags(ctdb
);
1267 if (public_address_list
) {
1268 ctdb
->public_addresses_file
= public_address_list
;
1269 ret
= ctdb_set_public_addresses(ctdb
, true);
1271 DEBUG(DEBUG_ALERT
,("Unable to setup public address list\n"));
1274 if (ctdb
->do_checkpublicip
) {
1275 ctdb_start_monitoring_interfaces(ctdb
);
1280 /* attach to existing databases */
1281 if (ctdb_attach_databases(ctdb
) != 0) {
1282 ctdb_fatal(ctdb
, "Failed to attach to databases\n");
1285 /* start frozen, then let the first election sort things out */
1286 if (!ctdb_blocking_freeze(ctdb
)) {
1287 ctdb_fatal(ctdb
, "Failed to get initial freeze\n");
1290 /* now start accepting clients, only can do this once frozen */
1291 fde
= event_add_fd(ctdb
->ev
, ctdb
, ctdb
->daemon
.sd
,
1293 ctdb_accept_client
, ctdb
);
1295 ctdb_fatal(ctdb
, "Failed to add daemon socket to event loop");
1297 tevent_fd_set_auto_close(fde
);
1299 /* release any IPs we hold from previous runs of the daemon */
1300 if (ctdb
->tunable
.disable_ip_failover
== 0) {
1301 ctdb_release_all_ips(ctdb
);
1304 /* Start the transport */
1305 if (ctdb
->methods
->start(ctdb
) != 0) {
1306 DEBUG(DEBUG_ALERT
,("transport failed to start!\n"));
1307 ctdb_fatal(ctdb
, "transport failed to start");
1310 /* Recovery daemon and timed events are started from the
1311 * callback, only after the setup event completes
1314 ctdb_set_runstate(ctdb
, CTDB_RUNSTATE_SETUP
);
1315 ret
= ctdb_event_script_callback(ctdb
,
1317 ctdb_setup_event_callback
,
1324 DEBUG(DEBUG_CRIT
,("Failed to set up 'setup' event\n"));
1328 ctdb_lockdown_memory(ctdb
);
1330 /* go into a wait loop to allow other nodes to complete */
1331 event_loop_wait(ctdb
->ev
);
1333 DEBUG(DEBUG_CRIT
,("event_loop_wait() returned. this should not happen\n"));
1338 allocate a packet for use in daemon<->daemon communication
1340 struct ctdb_req_header
*_ctdb_transport_allocate(struct ctdb_context
*ctdb
,
1341 TALLOC_CTX
*mem_ctx
,
1342 enum ctdb_operation operation
,
1343 size_t length
, size_t slength
,
1347 struct ctdb_req_header
*hdr
;
1349 length
= MAX(length
, slength
);
1350 size
= (length
+(CTDB_DS_ALIGNMENT
-1)) & ~(CTDB_DS_ALIGNMENT
-1);
1352 if (ctdb
->methods
== NULL
) {
1353 DEBUG(DEBUG_INFO
,(__location__
" Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1354 operation
, (unsigned)length
));
1358 hdr
= (struct ctdb_req_header
*)ctdb
->methods
->allocate_pkt(mem_ctx
, size
);
1360 DEBUG(DEBUG_ERR
,("Unable to allocate transport packet for operation %u of length %u\n",
1361 operation
, (unsigned)length
));
1364 talloc_set_name_const(hdr
, type
);
1365 memset(hdr
, 0, slength
);
1366 hdr
->length
= length
;
1367 hdr
->operation
= operation
;
1368 hdr
->ctdb_magic
= CTDB_MAGIC
;
1369 hdr
->ctdb_version
= CTDB_VERSION
;
1370 hdr
->generation
= ctdb
->vnn_map
->generation
;
1371 hdr
->srcnode
= ctdb
->pnn
;
1376 struct daemon_control_state
{
1377 struct daemon_control_state
*next
, *prev
;
1378 struct ctdb_client
*client
;
1379 struct ctdb_req_control
*c
;
1381 struct ctdb_node
*node
;
1385 callback when a control reply comes in
1387 static void daemon_control_callback(struct ctdb_context
*ctdb
,
1388 int32_t status
, TDB_DATA data
,
1389 const char *errormsg
,
1392 struct daemon_control_state
*state
= talloc_get_type(private_data
,
1393 struct daemon_control_state
);
1394 struct ctdb_client
*client
= state
->client
;
1395 struct ctdb_reply_control
*r
;
1399 /* construct a message to send to the client containing the data */
1400 len
= offsetof(struct ctdb_reply_control
, data
) + data
.dsize
;
1402 len
+= strlen(errormsg
);
1404 r
= ctdbd_allocate_pkt(ctdb
, state
, CTDB_REPLY_CONTROL
, len
,
1405 struct ctdb_reply_control
);
1406 CTDB_NO_MEMORY_VOID(ctdb
, r
);
1408 r
->hdr
.reqid
= state
->reqid
;
1410 r
->datalen
= data
.dsize
;
1412 memcpy(&r
->data
[0], data
.dptr
, data
.dsize
);
1414 r
->errorlen
= strlen(errormsg
);
1415 memcpy(&r
->data
[r
->datalen
], errormsg
, r
->errorlen
);
1418 ret
= daemon_queue_send(client
, &r
->hdr
);
1425 fail all pending controls to a disconnected node
1427 void ctdb_daemon_cancel_controls(struct ctdb_context
*ctdb
, struct ctdb_node
*node
)
1429 struct daemon_control_state
*state
;
1430 while ((state
= node
->pending_controls
)) {
1431 DLIST_REMOVE(node
->pending_controls
, state
);
1432 daemon_control_callback(ctdb
, (uint32_t)-1, tdb_null
,
1433 "node is disconnected", state
);
1438 destroy a daemon_control_state
1440 static int daemon_control_destructor(struct daemon_control_state
*state
)
1443 DLIST_REMOVE(state
->node
->pending_controls
, state
);
1449 this is called when the ctdb daemon received a ctdb request control
1450 from a local client over the unix domain socket
1452 static void daemon_request_control_from_client(struct ctdb_client
*client
,
1453 struct ctdb_req_control
*c
)
1457 struct daemon_control_state
*state
;
1458 TALLOC_CTX
*tmp_ctx
= talloc_new(client
);
1460 if (c
->hdr
.destnode
== CTDB_CURRENT_NODE
) {
1461 c
->hdr
.destnode
= client
->ctdb
->pnn
;
1464 state
= talloc(client
, struct daemon_control_state
);
1465 CTDB_NO_MEMORY_VOID(client
->ctdb
, state
);
1467 state
->client
= client
;
1468 state
->c
= talloc_steal(state
, c
);
1469 state
->reqid
= c
->hdr
.reqid
;
1470 if (ctdb_validate_pnn(client
->ctdb
, c
->hdr
.destnode
)) {
1471 state
->node
= client
->ctdb
->nodes
[c
->hdr
.destnode
];
1472 DLIST_ADD(state
->node
->pending_controls
, state
);
1477 talloc_set_destructor(state
, daemon_control_destructor
);
1479 if (c
->flags
& CTDB_CTRL_FLAG_NOREPLY
) {
1480 talloc_steal(tmp_ctx
, state
);
1483 data
.dptr
= &c
->data
[0];
1484 data
.dsize
= c
->datalen
;
1485 res
= ctdb_daemon_send_control(client
->ctdb
, c
->hdr
.destnode
,
1486 c
->srvid
, c
->opcode
, client
->client_id
,
1488 data
, daemon_control_callback
,
1491 DEBUG(DEBUG_ERR
,(__location__
" Failed to send control to remote node %u\n",
1495 talloc_free(tmp_ctx
);
1499 register a call function
1501 int ctdb_daemon_set_call(struct ctdb_context
*ctdb
, uint32_t db_id
,
1502 ctdb_fn_t fn
, int id
)
1504 struct ctdb_registered_call
*call
;
1505 struct ctdb_db_context
*ctdb_db
;
1507 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
1508 if (ctdb_db
== NULL
) {
1512 call
= talloc(ctdb_db
, struct ctdb_registered_call
);
1516 DLIST_ADD(ctdb_db
->calls
, call
);
1523 this local messaging handler is ugly, but is needed to prevent
1524 recursion in ctdb_send_message() when the destination node is the
1525 same as the source node
1527 struct ctdb_local_message
{
1528 struct ctdb_context
*ctdb
;
1533 static void ctdb_local_message_trigger(struct event_context
*ev
, struct timed_event
*te
,
1534 struct timeval t
, void *private_data
)
1536 struct ctdb_local_message
*m
= talloc_get_type(private_data
,
1537 struct ctdb_local_message
);
1540 res
= ctdb_dispatch_message(m
->ctdb
, m
->srvid
, m
->data
);
1542 DEBUG(DEBUG_ERR
, (__location__
" Failed to dispatch message for srvid=%llu\n",
1543 (unsigned long long)m
->srvid
));
1548 static int ctdb_local_message(struct ctdb_context
*ctdb
, uint64_t srvid
, TDB_DATA data
)
1550 struct ctdb_local_message
*m
;
1551 m
= talloc(ctdb
, struct ctdb_local_message
);
1552 CTDB_NO_MEMORY(ctdb
, m
);
1557 m
->data
.dptr
= talloc_memdup(m
, m
->data
.dptr
, m
->data
.dsize
);
1558 if (m
->data
.dptr
== NULL
) {
1563 /* this needs to be done as an event to prevent recursion */
1564 event_add_timed(ctdb
->ev
, m
, timeval_zero(), ctdb_local_message_trigger
, m
);
1571 int ctdb_daemon_send_message(struct ctdb_context
*ctdb
, uint32_t pnn
,
1572 uint64_t srvid
, TDB_DATA data
)
1574 struct ctdb_req_message
*r
;
1577 if (ctdb
->methods
== NULL
) {
1578 DEBUG(DEBUG_INFO
,(__location__
" Failed to send message. Transport is DOWN\n"));
1582 /* see if this is a message to ourselves */
1583 if (pnn
== ctdb
->pnn
) {
1584 return ctdb_local_message(ctdb
, srvid
, data
);
1587 len
= offsetof(struct ctdb_req_message
, data
) + data
.dsize
;
1588 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_MESSAGE
, len
,
1589 struct ctdb_req_message
);
1590 CTDB_NO_MEMORY(ctdb
, r
);
1592 r
->hdr
.destnode
= pnn
;
1594 r
->datalen
= data
.dsize
;
1595 memcpy(&r
->data
[0], data
.dptr
, data
.dsize
);
1597 ctdb_queue_packet(ctdb
, &r
->hdr
);
1605 struct ctdb_client_notify_list
{
1606 struct ctdb_client_notify_list
*next
, *prev
;
1607 struct ctdb_context
*ctdb
;
1613 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list
*nl
)
1617 DEBUG(DEBUG_ERR
,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl
->srvid
));
1619 ret
= ctdb_daemon_send_message(nl
->ctdb
, CTDB_BROADCAST_CONNECTED
, (unsigned long long)nl
->srvid
, nl
->data
);
1621 DEBUG(DEBUG_ERR
,("Failed to send client notify message\n"));
1627 int32_t ctdb_control_register_notify(struct ctdb_context
*ctdb
, uint32_t client_id
, TDB_DATA indata
)
1629 struct ctdb_client_notify_register
*notify
= (struct ctdb_client_notify_register
*)indata
.dptr
;
1630 struct ctdb_client
*client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
1631 struct ctdb_client_notify_list
*nl
;
1633 DEBUG(DEBUG_INFO
,("Register srvid %llu for client %d\n", (unsigned long long)notify
->srvid
, client_id
));
1635 if (indata
.dsize
< offsetof(struct ctdb_client_notify_register
, notify_data
)) {
1636 DEBUG(DEBUG_ERR
,(__location__
" Too little data in control : %d\n", (int)indata
.dsize
));
1640 if (indata
.dsize
!= (notify
->len
+ offsetof(struct ctdb_client_notify_register
, notify_data
))) {
1641 DEBUG(DEBUG_ERR
,(__location__
" Wrong amount of data in control. Got %d, expected %d\n", (int)indata
.dsize
, (int)(notify
->len
+ offsetof(struct ctdb_client_notify_register
, notify_data
))));
1646 if (client
== NULL
) {
1647 DEBUG(DEBUG_ERR
,(__location__
" Could not find client parent structure. You can not send this control to a remote node\n"));
1651 for(nl
=client
->notify
; nl
; nl
=nl
->next
) {
1652 if (nl
->srvid
== notify
->srvid
) {
1657 DEBUG(DEBUG_ERR
,(__location__
" Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify
->srvid
));
1661 nl
= talloc(client
, struct ctdb_client_notify_list
);
1662 CTDB_NO_MEMORY(ctdb
, nl
);
1664 nl
->srvid
= notify
->srvid
;
1665 nl
->data
.dsize
= notify
->len
;
1666 nl
->data
.dptr
= talloc_size(nl
, nl
->data
.dsize
);
1667 CTDB_NO_MEMORY(ctdb
, nl
->data
.dptr
);
1668 memcpy(nl
->data
.dptr
, notify
->notify_data
, nl
->data
.dsize
);
1670 DLIST_ADD(client
->notify
, nl
);
1671 talloc_set_destructor(nl
, ctdb_client_notify_destructor
);
1676 int32_t ctdb_control_deregister_notify(struct ctdb_context
*ctdb
, uint32_t client_id
, TDB_DATA indata
)
1678 struct ctdb_client_notify_deregister
*notify
= (struct ctdb_client_notify_deregister
*)indata
.dptr
;
1679 struct ctdb_client
*client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
1680 struct ctdb_client_notify_list
*nl
;
1682 DEBUG(DEBUG_INFO
,("Deregister srvid %llu for client %d\n", (unsigned long long)notify
->srvid
, client_id
));
1684 if (client
== NULL
) {
1685 DEBUG(DEBUG_ERR
,(__location__
" Could not find client parent structure. You can not send this control to a remote node\n"));
1689 for(nl
=client
->notify
; nl
; nl
=nl
->next
) {
1690 if (nl
->srvid
== notify
->srvid
) {
1695 DEBUG(DEBUG_ERR
,(__location__
" No notification for srvid:%llu found for this client\n", (unsigned long long)notify
->srvid
));
1699 DLIST_REMOVE(client
->notify
, nl
);
1700 talloc_set_destructor(nl
, NULL
);
1706 struct ctdb_client
*ctdb_find_client_by_pid(struct ctdb_context
*ctdb
, pid_t pid
)
1708 struct ctdb_client_pid_list
*client_pid
;
1710 for (client_pid
= ctdb
->client_pids
; client_pid
; client_pid
=client_pid
->next
) {
1711 if (client_pid
->pid
== pid
) {
1712 return client_pid
->client
;
1719 /* This control is used by samba when probing if a process (of a samba daemon)
1721 Samba does this when it needs/wants to check if a subrecord in one of the
1722 databases is still valied, or if it is stale and can be removed.
1723 If the node is in unhealthy or stopped state we just kill of the samba
1724 process holding htis sub-record and return to the calling samba that
1725 the process does not exist.
1726 This allows us to forcefully recall subrecords registered by samba processes
1727 on banned and stopped nodes.
1729 int32_t ctdb_control_process_exists(struct ctdb_context
*ctdb
, pid_t pid
)
1731 struct ctdb_client
*client
;
1733 if (ctdb
->nodes
[ctdb
->pnn
]->flags
& (NODE_FLAGS_BANNED
|NODE_FLAGS_STOPPED
)) {
1734 client
= ctdb_find_client_by_pid(ctdb
, pid
);
1735 if (client
!= NULL
) {
1736 DEBUG(DEBUG_NOTICE
,(__location__
" Killing client with pid:%d on banned/stopped node\n", (int)pid
));
1737 talloc_free(client
);
1742 return kill(pid
, 0);
1745 void ctdb_shutdown_sequence(struct ctdb_context
*ctdb
, int exit_code
)
1747 if (ctdb
->runstate
== CTDB_RUNSTATE_SHUTDOWN
) {
1748 DEBUG(DEBUG_NOTICE
,("Already shutting down so will not proceed.\n"));
1752 DEBUG(DEBUG_NOTICE
,("Shutdown sequence commencing.\n"));
1753 ctdb_set_runstate(ctdb
, CTDB_RUNSTATE_SHUTDOWN
);
1754 ctdb_stop_recoverd(ctdb
);
1755 ctdb_stop_keepalive(ctdb
);
1756 ctdb_stop_monitoring(ctdb
);
1757 ctdb_release_all_ips(ctdb
);
1758 ctdb_event_script(ctdb
, CTDB_EVENT_SHUTDOWN
);
1759 if (ctdb
->methods
!= NULL
) {
1760 ctdb
->methods
->shutdown(ctdb
);
1763 DEBUG(DEBUG_NOTICE
,("Shutdown sequence complete, exiting.\n"));