4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tdb/include/tdb.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb.h"
28 #include "../include/ctdb_private.h"
30 static void daemon_incoming_packet(void *, struct ctdb_req_header
*);
33 handler for when a node changes its flags
35 static void flag_change_handler(struct ctdb_context
*ctdb
, uint64_t srvid
,
36 TDB_DATA data
, void *private_data
)
38 struct ctdb_node_flag_change
*c
= (struct ctdb_node_flag_change
*)data
.dptr
;
40 if (data
.dsize
!= sizeof(*c
) || !ctdb_validate_vnn(ctdb
, c
->vnn
)) {
41 DEBUG(0,(__location__
"Invalid data in ctdb_node_flag_change\n"));
45 if (!ctdb_validate_vnn(ctdb
, c
->vnn
)) {
46 DEBUG(0,("Bad vnn %u in flag_change_handler\n", c
->vnn
));
50 /* don't get the disconnected flag from the other node */
51 ctdb
->nodes
[c
->vnn
]->flags
=
52 (ctdb
->nodes
[c
->vnn
]->flags
&NODE_FLAGS_DISCONNECTED
)
53 | (c
->flags
& ~NODE_FLAGS_DISCONNECTED
);
54 DEBUG(2,("Node flags for node %u are now 0x%x\n", c
->vnn
, ctdb
->nodes
[c
->vnn
]->flags
));
56 /* make sure we don't hold any IPs when we shouldn't */
57 if (c
->vnn
== ctdb
->vnn
&&
58 (ctdb
->nodes
[c
->vnn
]->flags
& (NODE_FLAGS_INACTIVE
|NODE_FLAGS_BANNED
))) {
59 ctdb_release_all_ips(ctdb
);
63 /* called when the "startup" event script has finished */
64 static void ctdb_start_transport(struct ctdb_context
*ctdb
, int status
, void *p
)
67 DEBUG(0,("startup event failed!\n"));
68 ctdb_fatal(ctdb
, "startup event script failed");
71 /* start the transport running */
72 if (ctdb
->methods
->start(ctdb
) != 0) {
73 DEBUG(0,("transport failed to start!\n"));
74 ctdb_fatal(ctdb
, "transport failed to start");
77 /* start the recovery daemon process */
78 if (ctdb_start_recoverd(ctdb
) != 0) {
79 DEBUG(0,("Failed to start recovery daemon\n"));
83 /* a handler for when nodes are disabled/enabled */
84 ctdb_register_message_handler(ctdb
, ctdb
, CTDB_SRVID_NODE_FLAGS_CHANGED
,
85 flag_change_handler
, NULL
);
87 /* start monitoring for dead nodes */
88 ctdb_start_monitoring(ctdb
);
91 /* go into main ctdb loop */
92 static void ctdb_main_loop(struct ctdb_context
*ctdb
)
96 if (strcmp(ctdb
->transport
, "tcp") == 0) {
97 int ctdb_tcp_init(struct ctdb_context
*);
98 ret
= ctdb_tcp_init(ctdb
);
100 #ifdef USE_INFINIBAND
101 if (strcmp(ctdb
->transport
, "ib") == 0) {
102 int ctdb_ibw_init(struct ctdb_context
*);
103 ret
= ctdb_ibw_init(ctdb
);
107 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb
->transport
));
111 /* initialise the transport */
112 if (ctdb
->methods
->initialise(ctdb
) != 0) {
113 DEBUG(0,("transport failed to initialise!\n"));
114 ctdb_fatal(ctdb
, "transport failed to initialise");
117 /* tell all other nodes we've just started up */
118 ctdb_daemon_send_control(ctdb
, CTDB_BROADCAST_ALL
,
119 0, CTDB_CONTROL_STARTUP
, 0,
120 CTDB_CTRL_FLAG_NOREPLY
,
121 tdb_null
, NULL
, NULL
);
123 /* release any IPs we hold from previous runs of the daemon */
124 ctdb_release_all_ips(ctdb
);
126 ret
= ctdb_event_script_callback(ctdb
, timeval_zero(), ctdb
,
127 ctdb_start_transport
, NULL
, "startup");
129 DEBUG(0,("Failed startup event script\n"));
133 /* go into a wait loop to allow other nodes to complete */
134 event_loop_wait(ctdb
->ev
);
136 DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
141 static void block_signal(int signum
)
143 struct sigaction act
;
145 memset(&act
, 0, sizeof(act
));
147 act
.sa_handler
= SIG_IGN
;
148 sigemptyset(&act
.sa_mask
);
149 sigaddset(&act
.sa_mask
, signum
);
150 sigaction(signum
, &act
, NULL
);
155 send a packet to a client
157 static int daemon_queue_send(struct ctdb_client
*client
, struct ctdb_req_header
*hdr
)
159 client
->ctdb
->statistics
.client_packets_sent
++;
160 return ctdb_queue_send(client
->queue
, (uint8_t *)hdr
, hdr
->length
);
164 message handler for when we are in daemon mode. This redirects the message
167 static void daemon_message_handler(struct ctdb_context
*ctdb
, uint64_t srvid
,
168 TDB_DATA data
, void *private_data
)
170 struct ctdb_client
*client
= talloc_get_type(private_data
, struct ctdb_client
);
171 struct ctdb_req_message
*r
;
174 /* construct a message to send to the client containing the data */
175 len
= offsetof(struct ctdb_req_message
, data
) + data
.dsize
;
176 r
= ctdbd_allocate_pkt(ctdb
, ctdb
, CTDB_REQ_MESSAGE
,
177 len
, struct ctdb_req_message
);
178 CTDB_NO_MEMORY_VOID(ctdb
, r
);
180 talloc_set_name_const(r
, "req_message packet");
183 r
->datalen
= data
.dsize
;
184 memcpy(&r
->data
[0], data
.dptr
, data
.dsize
);
186 daemon_queue_send(client
, &r
->hdr
);
193 this is called when the ctdb daemon received a ctdb request to
194 set the srvid from the client
196 int daemon_register_message_handler(struct ctdb_context
*ctdb
, uint32_t client_id
, uint64_t srvid
)
198 struct ctdb_client
*client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
200 if (client
== NULL
) {
201 DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
204 res
= ctdb_register_message_handler(ctdb
, client
, srvid
, daemon_message_handler
, client
);
206 DEBUG(0,(__location__
" Failed to register handler %llu in daemon\n",
207 (unsigned long long)srvid
));
209 DEBUG(2,(__location__
" Registered message handler for srvid=%llu\n",
210 (unsigned long long)srvid
));
213 /* this is a hack for Samba - we now know the pid of the Samba client */
214 if ((srvid
& 0xFFFFFFFF) == srvid
&&
215 kill(srvid
, 0) == 0) {
217 DEBUG(0,(__location__
" Registered PID %u for client %u\n",
218 (unsigned)client
->pid
, client_id
));
224 this is called when the ctdb daemon received a ctdb request to
225 remove a srvid from the client
227 int daemon_deregister_message_handler(struct ctdb_context
*ctdb
, uint32_t client_id
, uint64_t srvid
)
229 struct ctdb_client
*client
= ctdb_reqid_find(ctdb
, client_id
, struct ctdb_client
);
230 if (client
== NULL
) {
231 DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
234 return ctdb_deregister_message_handler(ctdb
, srvid
, client
);
239 destroy a ctdb_client
241 static int ctdb_client_destructor(struct ctdb_client
*client
)
243 ctdb_takeover_client_destructor_hook(client
);
244 ctdb_reqid_remove(client
->ctdb
, client
->client_id
);
245 client
->ctdb
->statistics
.num_clients
--;
251 this is called when the ctdb daemon received a ctdb request message
252 from a local client over the unix domain socket
254 static void daemon_request_message_from_client(struct ctdb_client
*client
,
255 struct ctdb_req_message
*c
)
260 /* maybe the message is for another client on this node */
261 if (ctdb_get_vnn(client
->ctdb
)==c
->hdr
.destnode
) {
262 ctdb_request_message(client
->ctdb
, (struct ctdb_req_header
*)c
);
266 /* its for a remote node */
267 data
.dptr
= &c
->data
[0];
268 data
.dsize
= c
->datalen
;
269 res
= ctdb_daemon_send_message(client
->ctdb
, c
->hdr
.destnode
,
272 DEBUG(0,(__location__
" Failed to send message to remote node %u\n",
278 struct daemon_call_state
{
279 struct ctdb_client
*client
;
281 struct ctdb_call
*call
;
282 struct timeval start_time
;
286 complete a call from a client
288 static void daemon_call_from_client_callback(struct ctdb_call_state
*state
)
290 struct daemon_call_state
*dstate
= talloc_get_type(state
->async
.private_data
,
291 struct daemon_call_state
);
292 struct ctdb_reply_call
*r
;
295 struct ctdb_client
*client
= dstate
->client
;
297 talloc_steal(client
, dstate
);
298 talloc_steal(dstate
, dstate
->call
);
300 res
= ctdb_daemon_call_recv(state
, dstate
->call
);
302 DEBUG(0, (__location__
" ctdbd_call_recv() returned error\n"));
303 client
->ctdb
->statistics
.pending_calls
--;
304 ctdb_latency(&client
->ctdb
->statistics
.max_call_latency
, dstate
->start_time
);
308 length
= offsetof(struct ctdb_reply_call
, data
) + dstate
->call
->reply_data
.dsize
;
309 r
= ctdbd_allocate_pkt(client
->ctdb
, dstate
, CTDB_REPLY_CALL
,
310 length
, struct ctdb_reply_call
);
312 DEBUG(0, (__location__
" Failed to allocate reply_call in ctdb daemon\n"));
313 client
->ctdb
->statistics
.pending_calls
--;
314 ctdb_latency(&client
->ctdb
->statistics
.max_call_latency
, dstate
->start_time
);
317 r
->hdr
.reqid
= dstate
->reqid
;
318 r
->datalen
= dstate
->call
->reply_data
.dsize
;
319 memcpy(&r
->data
[0], dstate
->call
->reply_data
.dptr
, r
->datalen
);
321 res
= daemon_queue_send(client
, &r
->hdr
);
323 DEBUG(0, (__location__
" Failed to queue packet from daemon to client\n"));
325 ctdb_latency(&client
->ctdb
->statistics
.max_call_latency
, dstate
->start_time
);
327 client
->ctdb
->statistics
.pending_calls
--;
331 static void daemon_request_call_from_client(struct ctdb_client
*client
,
332 struct ctdb_req_call
*c
);
335 this is called when the ctdb daemon received a ctdb request call
336 from a local client over the unix domain socket
338 static void daemon_request_call_from_client(struct ctdb_client
*client
,
339 struct ctdb_req_call
*c
)
341 struct ctdb_call_state
*state
;
342 struct ctdb_db_context
*ctdb_db
;
343 struct daemon_call_state
*dstate
;
344 struct ctdb_call
*call
;
345 struct ctdb_ltdb_header header
;
348 struct ctdb_context
*ctdb
= client
->ctdb
;
350 ctdb
->statistics
.total_calls
++;
351 ctdb
->statistics
.pending_calls
++;
353 ctdb_db
= find_ctdb_db(client
->ctdb
, c
->db_id
);
355 DEBUG(0, (__location__
" Unknown database in request. db_id==0x%08x",
357 ctdb
->statistics
.pending_calls
--;
362 key
.dsize
= c
->keylen
;
364 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, key
, &header
,
365 (struct ctdb_req_header
*)c
, &data
,
366 daemon_incoming_packet
, client
, True
);
368 /* will retry later */
369 ctdb
->statistics
.pending_calls
--;
374 DEBUG(0,(__location__
" Unable to fetch record\n"));
375 ctdb
->statistics
.pending_calls
--;
379 dstate
= talloc(client
, struct daemon_call_state
);
380 if (dstate
== NULL
) {
381 ctdb_ltdb_unlock(ctdb_db
, key
);
382 DEBUG(0,(__location__
" Unable to allocate dstate\n"));
383 ctdb
->statistics
.pending_calls
--;
386 dstate
->start_time
= timeval_current();
387 dstate
->client
= client
;
388 dstate
->reqid
= c
->hdr
.reqid
;
389 talloc_steal(dstate
, data
.dptr
);
391 call
= dstate
->call
= talloc_zero(dstate
, struct ctdb_call
);
393 ctdb_ltdb_unlock(ctdb_db
, key
);
394 DEBUG(0,(__location__
" Unable to allocate call\n"));
395 ctdb
->statistics
.pending_calls
--;
396 ctdb_latency(&ctdb
->statistics
.max_call_latency
, dstate
->start_time
);
400 call
->call_id
= c
->callid
;
402 call
->call_data
.dptr
= c
->data
+ c
->keylen
;
403 call
->call_data
.dsize
= c
->calldatalen
;
404 call
->flags
= c
->flags
;
406 if (header
.dmaster
== ctdb
->vnn
) {
407 state
= ctdb_call_local_send(ctdb_db
, call
, &header
, &data
);
409 state
= ctdb_daemon_call_send_remote(ctdb_db
, call
, &header
);
412 ctdb_ltdb_unlock(ctdb_db
, key
);
415 DEBUG(0,(__location__
" Unable to setup call send\n"));
416 ctdb
->statistics
.pending_calls
--;
417 ctdb_latency(&ctdb
->statistics
.max_call_latency
, dstate
->start_time
);
420 talloc_steal(state
, dstate
);
421 talloc_steal(client
, state
);
423 state
->async
.fn
= daemon_call_from_client_callback
;
424 state
->async
.private_data
= dstate
;
428 static void daemon_request_control_from_client(struct ctdb_client
*client
,
429 struct ctdb_req_control
*c
);
431 /* data contains a packet from the client */
432 static void daemon_incoming_packet(void *p
, struct ctdb_req_header
*hdr
)
434 struct ctdb_client
*client
= talloc_get_type(p
, struct ctdb_client
);
436 struct ctdb_context
*ctdb
= client
->ctdb
;
438 /* place the packet as a child of a tmp_ctx. We then use
439 talloc_free() below to free it. If any of the calls want
440 to keep it, then they will steal it somewhere else, and the
441 talloc_free() will be a no-op */
442 tmp_ctx
= talloc_new(client
);
443 talloc_steal(tmp_ctx
, hdr
);
445 if (hdr
->ctdb_magic
!= CTDB_MAGIC
) {
446 ctdb_set_error(client
->ctdb
, "Non CTDB packet rejected in daemon\n");
450 if (hdr
->ctdb_version
!= CTDB_VERSION
) {
451 ctdb_set_error(client
->ctdb
, "Bad CTDB version 0x%x rejected in daemon\n", hdr
->ctdb_version
);
455 switch (hdr
->operation
) {
457 ctdb
->statistics
.client
.req_call
++;
458 daemon_request_call_from_client(client
, (struct ctdb_req_call
*)hdr
);
461 case CTDB_REQ_MESSAGE
:
462 ctdb
->statistics
.client
.req_message
++;
463 daemon_request_message_from_client(client
, (struct ctdb_req_message
*)hdr
);
466 case CTDB_REQ_CONTROL
:
467 ctdb
->statistics
.client
.req_control
++;
468 daemon_request_control_from_client(client
, (struct ctdb_req_control
*)hdr
);
472 DEBUG(0,(__location__
" daemon: unrecognized operation %u\n",
477 talloc_free(tmp_ctx
);
481 called when the daemon gets a incoming packet
483 static void ctdb_daemon_read_cb(uint8_t *data
, size_t cnt
, void *args
)
485 struct ctdb_client
*client
= talloc_get_type(args
, struct ctdb_client
);
486 struct ctdb_req_header
*hdr
;
493 client
->ctdb
->statistics
.client_packets_recv
++;
495 if (cnt
< sizeof(*hdr
)) {
496 ctdb_set_error(client
->ctdb
, "Bad packet length %u in daemon\n",
500 hdr
= (struct ctdb_req_header
*)data
;
501 if (cnt
!= hdr
->length
) {
502 ctdb_set_error(client
->ctdb
, "Bad header length %u expected %u\n in daemon",
503 (unsigned)hdr
->length
, (unsigned)cnt
);
507 if (hdr
->ctdb_magic
!= CTDB_MAGIC
) {
508 ctdb_set_error(client
->ctdb
, "Non CTDB packet rejected\n");
512 if (hdr
->ctdb_version
!= CTDB_VERSION
) {
513 ctdb_set_error(client
->ctdb
, "Bad CTDB version 0x%x rejected in daemon\n", hdr
->ctdb_version
);
517 DEBUG(3,(__location__
" client request %u of type %u length %u from "
518 "node %u to %u\n", hdr
->reqid
, hdr
->operation
, hdr
->length
,
519 hdr
->srcnode
, hdr
->destnode
));
521 /* it is the responsibility of the incoming packet function to free 'data' */
522 daemon_incoming_packet(client
, hdr
);
525 static void ctdb_accept_client(struct event_context
*ev
, struct fd_event
*fde
,
526 uint16_t flags
, void *private_data
)
528 struct sockaddr_in addr
;
531 struct ctdb_context
*ctdb
= talloc_get_type(private_data
, struct ctdb_context
);
532 struct ctdb_client
*client
;
534 memset(&addr
, 0, sizeof(addr
));
536 fd
= accept(ctdb
->daemon
.sd
, (struct sockaddr
*)&addr
, &len
);
542 set_close_on_exec(fd
);
544 client
= talloc_zero(ctdb
, struct ctdb_client
);
547 client
->client_id
= ctdb_reqid_new(ctdb
, client
);
548 ctdb
->statistics
.num_clients
++;
550 client
->queue
= ctdb_queue_setup(ctdb
, client
, fd
, CTDB_DS_ALIGNMENT
,
551 ctdb_daemon_read_cb
, client
);
553 talloc_set_destructor(client
, ctdb_client_destructor
);
559 create a unix domain socket and bind it
560 return a file descriptor open on the socket
562 static int ux_socket_bind(struct ctdb_context
*ctdb
)
564 struct sockaddr_un addr
;
566 ctdb
->daemon
.sd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
567 if (ctdb
->daemon
.sd
== -1) {
571 set_nonblocking(ctdb
->daemon
.sd
);
572 set_close_on_exec(ctdb
->daemon
.sd
);
575 /* AIX doesn't like this :( */
576 if (fchown(ctdb
->daemon
.sd
, geteuid(), getegid()) != 0 ||
577 fchmod(ctdb
->daemon
.sd
, 0700) != 0) {
578 DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
583 set_nonblocking(ctdb
->daemon
.sd
);
585 memset(&addr
, 0, sizeof(addr
));
586 addr
.sun_family
= AF_UNIX
;
587 strncpy(addr
.sun_path
, ctdb
->daemon
.name
, sizeof(addr
.sun_path
));
589 if (bind(ctdb
->daemon
.sd
, (struct sockaddr
*)&addr
, sizeof(addr
)) == -1) {
590 DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb
->daemon
.name
));
593 if (listen(ctdb
->daemon
.sd
, 10) != 0) {
594 DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb
->daemon
.name
));
601 close(ctdb
->daemon
.sd
);
602 ctdb
->daemon
.sd
= -1;
607 delete the socket on exit - called on destruction of autofree context
609 static int unlink_destructor(const char *name
)
617 start the protocol going as a daemon
619 int ctdb_start_daemon(struct ctdb_context
*ctdb
, bool do_fork
)
622 struct fd_event
*fde
;
623 const char *domain_socket_name
;
625 /* get rid of any old sockets */
626 unlink(ctdb
->daemon
.name
);
628 /* create a unix domain stream socket to listen to */
629 res
= ux_socket_bind(ctdb
);
631 DEBUG(0,(__location__
" Failed to open CTDB unix domain socket\n"));
635 if (do_fork
&& fork()) {
639 tdb_reopen_all(False
);
644 block_signal(SIGPIPE
);
646 /* try to set us up as realtime */
647 ctdb_set_realtime(true);
649 /* ensure the socket is deleted on exit of the daemon */
650 domain_socket_name
= talloc_strdup(talloc_autofree_context(), ctdb
->daemon
.name
);
651 talloc_set_destructor(domain_socket_name
, unlink_destructor
);
653 ctdb
->ev
= s4_event_context_init(NULL
);
655 /* start frozen, then let the first election sort things out */
656 if (!ctdb_blocking_freeze(ctdb
)) {
657 DEBUG(0,("Failed to get initial freeze\n"));
661 /* force initial recovery for election */
662 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
664 /* now start accepting clients, only can do this once frozen */
665 fde
= event_add_fd(ctdb
->ev
, ctdb
, ctdb
->daemon
.sd
,
666 EVENT_FD_READ
|EVENT_FD_AUTOCLOSE
,
667 ctdb_accept_client
, ctdb
);
669 ctdb_main_loop(ctdb
);
675 allocate a packet for use in daemon<->daemon communication
677 struct ctdb_req_header
*_ctdb_transport_allocate(struct ctdb_context
*ctdb
,
679 enum ctdb_operation operation
,
680 size_t length
, size_t slength
,
684 struct ctdb_req_header
*hdr
;
686 length
= MAX(length
, slength
);
687 size
= (length
+(CTDB_DS_ALIGNMENT
-1)) & ~(CTDB_DS_ALIGNMENT
-1);
689 hdr
= (struct ctdb_req_header
*)ctdb
->methods
->allocate_pkt(mem_ctx
, size
);
691 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
692 operation
, (unsigned)length
));
695 talloc_set_name_const(hdr
, type
);
696 memset(hdr
, 0, slength
);
697 hdr
->length
= length
;
698 hdr
->operation
= operation
;
699 hdr
->ctdb_magic
= CTDB_MAGIC
;
700 hdr
->ctdb_version
= CTDB_VERSION
;
701 hdr
->generation
= ctdb
->vnn_map
->generation
;
702 hdr
->srcnode
= ctdb
->vnn
;
707 struct daemon_control_state
{
708 struct daemon_control_state
*next
, *prev
;
709 struct ctdb_client
*client
;
710 struct ctdb_req_control
*c
;
712 struct ctdb_node
*node
;
716 callback when a control reply comes in
718 static void daemon_control_callback(struct ctdb_context
*ctdb
,
719 int32_t status
, TDB_DATA data
,
720 const char *errormsg
,
723 struct daemon_control_state
*state
= talloc_get_type(private_data
,
724 struct daemon_control_state
);
725 struct ctdb_client
*client
= state
->client
;
726 struct ctdb_reply_control
*r
;
729 /* construct a message to send to the client containing the data */
730 len
= offsetof(struct ctdb_reply_control
, data
) + data
.dsize
;
732 len
+= strlen(errormsg
);
734 r
= ctdbd_allocate_pkt(ctdb
, state
, CTDB_REPLY_CONTROL
, len
,
735 struct ctdb_reply_control
);
736 CTDB_NO_MEMORY_VOID(ctdb
, r
);
738 r
->hdr
.reqid
= state
->reqid
;
740 r
->datalen
= data
.dsize
;
742 memcpy(&r
->data
[0], data
.dptr
, data
.dsize
);
744 r
->errorlen
= strlen(errormsg
);
745 memcpy(&r
->data
[r
->datalen
], errormsg
, r
->errorlen
);
748 daemon_queue_send(client
, &r
->hdr
);
754 fail all pending controls to a disconnected node
756 void ctdb_daemon_cancel_controls(struct ctdb_context
*ctdb
, struct ctdb_node
*node
)
758 struct daemon_control_state
*state
;
759 while ((state
= node
->pending_controls
)) {
760 DLIST_REMOVE(node
->pending_controls
, state
);
761 daemon_control_callback(ctdb
, (uint32_t)-1, tdb_null
,
762 "node is disconnected", state
);
767 destroy a daemon_control_state
769 static int daemon_control_destructor(struct daemon_control_state
*state
)
772 DLIST_REMOVE(state
->node
->pending_controls
, state
);
778 this is called when the ctdb daemon received a ctdb request control
779 from a local client over the unix domain socket
781 static void daemon_request_control_from_client(struct ctdb_client
*client
,
782 struct ctdb_req_control
*c
)
786 struct daemon_control_state
*state
;
787 TALLOC_CTX
*tmp_ctx
= talloc_new(client
);
789 if (c
->hdr
.destnode
== CTDB_CURRENT_NODE
) {
790 c
->hdr
.destnode
= client
->ctdb
->vnn
;
793 state
= talloc(client
, struct daemon_control_state
);
794 CTDB_NO_MEMORY_VOID(client
->ctdb
, state
);
796 state
->client
= client
;
797 state
->c
= talloc_steal(state
, c
);
798 state
->reqid
= c
->hdr
.reqid
;
799 if (ctdb_validate_vnn(client
->ctdb
, c
->hdr
.destnode
)) {
800 state
->node
= client
->ctdb
->nodes
[c
->hdr
.destnode
];
801 DLIST_ADD(state
->node
->pending_controls
, state
);
806 talloc_set_destructor(state
, daemon_control_destructor
);
808 if (c
->flags
& CTDB_CTRL_FLAG_NOREPLY
) {
809 talloc_steal(tmp_ctx
, state
);
812 data
.dptr
= &c
->data
[0];
813 data
.dsize
= c
->datalen
;
814 res
= ctdb_daemon_send_control(client
->ctdb
, c
->hdr
.destnode
,
815 c
->srvid
, c
->opcode
, client
->client_id
,
817 data
, daemon_control_callback
,
820 DEBUG(0,(__location__
" Failed to send control to remote node %u\n",
824 talloc_free(tmp_ctx
);
828 register a call function
830 int ctdb_daemon_set_call(struct ctdb_context
*ctdb
, uint32_t db_id
,
831 ctdb_fn_t fn
, int id
)
833 struct ctdb_registered_call
*call
;
834 struct ctdb_db_context
*ctdb_db
;
836 ctdb_db
= find_ctdb_db(ctdb
, db_id
);
837 if (ctdb_db
== NULL
) {
841 call
= talloc(ctdb_db
, struct ctdb_registered_call
);
845 DLIST_ADD(ctdb_db
->calls
, call
);
852 this local messaging handler is ugly, but is needed to prevent
853 recursion in ctdb_send_message() when the destination node is the
854 same as the source node
856 struct ctdb_local_message
{
857 struct ctdb_context
*ctdb
;
862 static void ctdb_local_message_trigger(struct event_context
*ev
, struct timed_event
*te
,
863 struct timeval t
, void *private_data
)
865 struct ctdb_local_message
*m
= talloc_get_type(private_data
,
866 struct ctdb_local_message
);
869 res
= ctdb_dispatch_message(m
->ctdb
, m
->srvid
, m
->data
);
871 DEBUG(0, (__location__
" Failed to dispatch message for srvid=%llu\n",
872 (unsigned long long)m
->srvid
));
877 static int ctdb_local_message(struct ctdb_context
*ctdb
, uint64_t srvid
, TDB_DATA data
)
879 struct ctdb_local_message
*m
;
880 m
= talloc(ctdb
, struct ctdb_local_message
);
881 CTDB_NO_MEMORY(ctdb
, m
);
886 m
->data
.dptr
= talloc_memdup(m
, m
->data
.dptr
, m
->data
.dsize
);
887 if (m
->data
.dptr
== NULL
) {
892 /* this needs to be done as an event to prevent recursion */
893 event_add_timed(ctdb
->ev
, m
, timeval_zero(), ctdb_local_message_trigger
, m
);
900 int ctdb_daemon_send_message(struct ctdb_context
*ctdb
, uint32_t vnn
,
901 uint64_t srvid
, TDB_DATA data
)
903 struct ctdb_req_message
*r
;
906 /* see if this is a message to ourselves */
907 if (vnn
== ctdb
->vnn
) {
908 return ctdb_local_message(ctdb
, srvid
, data
);
911 len
= offsetof(struct ctdb_req_message
, data
) + data
.dsize
;
912 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_MESSAGE
, len
,
913 struct ctdb_req_message
);
914 CTDB_NO_MEMORY(ctdb
, r
);
916 r
->hdr
.destnode
= vnn
;
918 r
->datalen
= data
.dsize
;
919 memcpy(&r
->data
[0], data
.dptr
, data
.dsize
);
921 ctdb_queue_packet(ctdb
, &r
->hdr
);