2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record
{
32 struct ctdb_context
*ctdb
;
33 struct ctdb_db_context
*ctdb_db
;
38 find the ctdb_db from a db index
40 struct ctdb_db_context
*find_ctdb_db(struct ctdb_context
*ctdb
, uint32_t id
)
42 struct ctdb_db_context
*ctdb_db
;
44 for (ctdb_db
=ctdb
->db_list
; ctdb_db
; ctdb_db
=ctdb_db
->next
) {
45 if (ctdb_db
->db_id
== id
) {
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p
, struct ctdb_req_header
*hdr
)
57 struct ctdb_context
*ctdb
= talloc_get_type(p
, struct ctdb_context
);
58 ctdb_input_pkt(ctdb
, hdr
);
65 static void ctdb_send_error(struct ctdb_context
*ctdb
,
66 struct ctdb_req_header
*hdr
, uint32_t status
,
67 const char *fmt
, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context
*ctdb
,
69 struct ctdb_req_header
*hdr
, uint32_t status
,
73 struct ctdb_reply_error
*r
;
77 if (ctdb
->methods
== NULL
) {
78 DEBUG(DEBUG_INFO
,(__location__
" Failed to send error. Transport is DOWN\n"));
83 msg
= talloc_vasprintf(ctdb
, fmt
, ap
);
85 ctdb_fatal(ctdb
, "Unable to allocate error in ctdb_send_error\n");
89 msglen
= strlen(msg
)+1;
90 len
= offsetof(struct ctdb_reply_error
, msg
);
91 r
= ctdb_transport_allocate(ctdb
, msg
, CTDB_REPLY_ERROR
, len
+ msglen
,
92 struct ctdb_reply_error
);
93 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
95 r
->hdr
.destnode
= hdr
->srcnode
;
96 r
->hdr
.reqid
= hdr
->reqid
;
99 memcpy(&r
->msg
[0], msg
, msglen
);
101 ctdb_queue_packet(ctdb
, &r
->hdr
);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context
*ctdb
,
121 struct ctdb_db_context
*ctdb_db
,
123 struct ctdb_req_call
*c
,
124 struct ctdb_ltdb_header
*header
)
126 uint32_t lmaster
= ctdb_lmaster(ctdb
, &key
);
128 c
->hdr
.destnode
= lmaster
;
129 if (ctdb
->pnn
== lmaster
) {
130 c
->hdr
.destnode
= header
->dmaster
;
134 if (c
->hopcount
%100 > 95) {
135 DEBUG(DEBUG_WARNING
,("High hopcount %d dbid:%s "
136 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c
->hopcount
, ctdb_db
->db_name
, ctdb_hash(&key
),
139 c
->hdr
.reqid
, ctdb
->pnn
, c
->hdr
.srcnode
, lmaster
,
140 header
->dmaster
, c
->hdr
.destnode
));
143 ctdb_queue_packet(ctdb
, &c
->hdr
);
150 caller must have the chainlock before calling this routine. Caller must be
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context
*ctdb_db
,
154 struct ctdb_ltdb_header
*header
,
155 TDB_DATA key
, TDB_DATA data
,
156 uint32_t new_dmaster
,
159 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
160 struct ctdb_reply_dmaster
*r
;
164 if (ctdb
->pnn
!= ctdb_lmaster(ctdb
, &key
)) {
165 DEBUG(DEBUG_ALERT
,(__location__
" Caller is not lmaster!\n"));
169 header
->dmaster
= new_dmaster
;
170 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, data
);
172 ctdb_fatal(ctdb
, "ctdb_send_dmaster_reply unable to update dmaster");
176 if (ctdb
->methods
== NULL
) {
177 ctdb_fatal(ctdb
, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx
= talloc_new(ctdb
);
185 /* send the CTDB_REPLY_DMASTER */
186 len
= offsetof(struct ctdb_reply_dmaster
, data
) + key
.dsize
+ data
.dsize
+ sizeof(uint32_t);
187 r
= ctdb_transport_allocate(ctdb
, tmp_ctx
, CTDB_REPLY_DMASTER
, len
,
188 struct ctdb_reply_dmaster
);
189 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
191 r
->hdr
.destnode
= new_dmaster
;
192 r
->hdr
.reqid
= reqid
;
193 r
->rsn
= header
->rsn
;
194 r
->keylen
= key
.dsize
;
195 r
->datalen
= data
.dsize
;
196 r
->db_id
= ctdb_db
->db_id
;
197 memcpy(&r
->data
[0], key
.dptr
, key
.dsize
);
198 memcpy(&r
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
199 memcpy(&r
->data
[key
.dsize
+data
.dsize
], &header
->flags
, sizeof(uint32_t));
201 ctdb_queue_packet(ctdb
, &r
->hdr
);
203 talloc_free(tmp_ctx
);
207 send a dmaster request (give another node the dmaster for a record)
209 This is always sent to the lmaster, which ensures that the lmaster
210 always knows who the dmaster is. The lmaster will then send a
211 CTDB_REPLY_DMASTER to the new dmaster
213 static void ctdb_call_send_dmaster(struct ctdb_db_context
*ctdb_db
,
214 struct ctdb_req_call
*c
,
215 struct ctdb_ltdb_header
*header
,
216 TDB_DATA
*key
, TDB_DATA
*data
)
218 struct ctdb_req_dmaster
*r
;
219 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
221 uint32_t lmaster
= ctdb_lmaster(ctdb
, key
);
223 if (ctdb
->methods
== NULL
) {
224 ctdb_fatal(ctdb
, "Failed ctdb_call_send_dmaster since transport is down");
228 if (data
->dsize
!= 0) {
229 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
232 if (lmaster
== ctdb
->pnn
) {
233 ctdb_send_dmaster_reply(ctdb_db
, header
, *key
, *data
,
234 c
->hdr
.srcnode
, c
->hdr
.reqid
);
238 len
= offsetof(struct ctdb_req_dmaster
, data
) + key
->dsize
+ data
->dsize
240 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_DMASTER
, len
,
241 struct ctdb_req_dmaster
);
242 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
243 r
->hdr
.destnode
= lmaster
;
244 r
->hdr
.reqid
= c
->hdr
.reqid
;
246 r
->rsn
= header
->rsn
;
247 r
->dmaster
= c
->hdr
.srcnode
;
248 r
->keylen
= key
->dsize
;
249 r
->datalen
= data
->dsize
;
250 memcpy(&r
->data
[0], key
->dptr
, key
->dsize
);
251 memcpy(&r
->data
[key
->dsize
], data
->dptr
, data
->dsize
);
252 memcpy(&r
->data
[key
->dsize
+ data
->dsize
], &header
->flags
, sizeof(uint32_t));
254 header
->dmaster
= c
->hdr
.srcnode
;
255 if (ctdb_ltdb_store(ctdb_db
, *key
, header
, *data
) != 0) {
256 ctdb_fatal(ctdb
, "Failed to store record in ctdb_call_send_dmaster");
259 ctdb_queue_packet(ctdb
, &r
->hdr
);
264 static void ctdb_sticky_pindown_timeout(struct event_context
*ev
, struct timed_event
*te
,
265 struct timeval t
, void *private_data
)
267 struct ctdb_sticky_record
*sr
= talloc_get_type(private_data
,
268 struct ctdb_sticky_record
);
270 DEBUG(DEBUG_ERR
,("Pindown timeout db:%s unstick record\n", sr
->ctdb_db
->db_name
));
271 if (sr
->pindown
!= NULL
) {
272 talloc_free(sr
->pindown
);
278 ctdb_set_sticky_pindown(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
280 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
282 struct ctdb_sticky_record
*sr
;
284 k
= ctdb_key_to_idkey(tmp_ctx
, key
);
286 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
287 talloc_free(tmp_ctx
);
291 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
293 talloc_free(tmp_ctx
);
297 talloc_free(tmp_ctx
);
299 if (sr
->pindown
== NULL
) {
300 DEBUG(DEBUG_ERR
,("Pinning down record in %s for %d ms\n", ctdb_db
->db_name
, ctdb
->tunable
.sticky_pindown
));
301 sr
->pindown
= talloc_new(sr
);
302 if (sr
->pindown
== NULL
) {
303 DEBUG(DEBUG_ERR
,("Failed to allocate pindown context for sticky record\n"));
306 event_add_timed(ctdb
->ev
, sr
->pindown
, timeval_current_ofs(ctdb
->tunable
.sticky_pindown
/ 1000, (ctdb
->tunable
.sticky_pindown
* 1000) % 1000000), ctdb_sticky_pindown_timeout
, sr
);
313 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
314 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
316 must be called with the chainlock held. This function releases the chainlock
318 static void ctdb_become_dmaster(struct ctdb_db_context
*ctdb_db
,
319 struct ctdb_req_header
*hdr
,
320 TDB_DATA key
, TDB_DATA data
,
321 uint64_t rsn
, uint32_t record_flags
)
323 struct ctdb_call_state
*state
;
324 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
325 struct ctdb_ltdb_header header
;
328 DEBUG(DEBUG_DEBUG
,("pnn %u dmaster response %08x\n", ctdb
->pnn
, ctdb_hash(&key
)));
332 header
.dmaster
= ctdb
->pnn
;
333 header
.flags
= record_flags
;
335 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
338 if (state
->call
->flags
& CTDB_CALL_FLAG_VACUUM_MIGRATION
) {
340 * We temporarily add the VACUUM_MIGRATED flag to
341 * the record flags, so that ctdb_ltdb_store can
342 * decide whether the record should be stored or
345 header
.flags
|= CTDB_REC_FLAG_VACUUM_MIGRATED
;
349 if (ctdb_ltdb_store(ctdb_db
, key
, &header
, data
) != 0) {
350 ctdb_fatal(ctdb
, "ctdb_reply_dmaster store failed\n");
352 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
354 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
359 /* we just became DMASTER and this database is "sticky",
360 see if the record is flagged as "hot" and set up a pin-down
361 context to stop migrations for a little while if so
363 if (ctdb_db
->sticky
) {
364 ctdb_set_sticky_pindown(ctdb
, ctdb_db
, key
);
368 DEBUG(DEBUG_ERR
,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
369 ctdb
->pnn
, hdr
->reqid
, hdr
->srcnode
));
371 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
373 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
378 if (key
.dsize
!= state
->call
->key
.dsize
|| memcmp(key
.dptr
, state
->call
->key
.dptr
, key
.dsize
)) {
379 DEBUG(DEBUG_ERR
, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr
->reqid
, hdr
->srcnode
));
381 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
383 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
388 if (hdr
->reqid
!= state
->reqid
) {
389 /* we found a record but it was the wrong one */
390 DEBUG(DEBUG_ERR
, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr
->reqid
, hdr
->srcnode
));
392 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
394 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
399 ctdb_call_local(ctdb_db
, state
->call
, &header
, state
, &data
, true);
401 ret
= ctdb_ltdb_unlock(ctdb_db
, state
->call
->key
);
403 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
406 state
->state
= CTDB_CALL_DONE
;
407 if (state
->async
.fn
) {
408 state
->async
.fn(state
);
412 struct dmaster_defer_call
{
413 struct dmaster_defer_call
*next
, *prev
;
414 struct ctdb_context
*ctdb
;
415 struct ctdb_req_header
*hdr
;
418 struct dmaster_defer_queue
{
419 struct ctdb_context
*ctdb
;
421 struct dmaster_defer_call
*deferred_calls
;
424 static void dmaster_defer_reprocess(struct tevent_context
*ev
,
425 struct tevent_timer
*te
,
429 struct dmaster_defer_call
*call
= talloc_get_type(
430 private_data
, struct dmaster_defer_call
);
432 ctdb_input_pkt(call
->ctdb
, call
->hdr
);
436 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue
*ddq
)
438 /* Ignore requests, if database recovery happens in-between. */
439 if (ddq
->generation
!= ddq
->ctdb
->vnn_map
->generation
) {
443 while (ddq
->deferred_calls
!= NULL
) {
444 struct dmaster_defer_call
*call
= ddq
->deferred_calls
;
446 DLIST_REMOVE(ddq
->deferred_calls
, call
);
448 talloc_steal(call
->ctdb
, call
);
449 tevent_add_timer(call
->ctdb
->ev
, call
, timeval_zero(),
450 dmaster_defer_reprocess
, call
);
455 static void *insert_ddq_callback(void *parm
, void *data
)
464 * This function is used to reigster a key in database that needs to be updated.
465 * Any requests for that key should get deferred till this is completed.
467 static int dmaster_defer_setup(struct ctdb_db_context
*ctdb_db
,
468 struct ctdb_req_header
*hdr
,
472 struct dmaster_defer_queue
*ddq
;
474 k
= ctdb_key_to_idkey(hdr
, key
);
476 DEBUG(DEBUG_ERR
, ("Failed to allocate key for dmaster defer setup\n"));
481 ddq
= trbt_lookuparray32(ctdb_db
->defer_dmaster
, k
[0], k
);
487 ddq
= talloc(hdr
, struct dmaster_defer_queue
);
489 DEBUG(DEBUG_ERR
, ("Failed to allocate dmaster defer queue\n"));
493 ddq
->ctdb
= ctdb_db
->ctdb
;
494 ddq
->generation
= hdr
->generation
;
495 ddq
->deferred_calls
= NULL
;
497 trbt_insertarray32_callback(ctdb_db
->defer_dmaster
, k
[0], k
,
498 insert_ddq_callback
, ddq
);
499 talloc_set_destructor(ddq
, dmaster_defer_queue_destructor
);
505 static int dmaster_defer_add(struct ctdb_db_context
*ctdb_db
,
506 struct ctdb_req_header
*hdr
,
509 struct dmaster_defer_queue
*ddq
;
510 struct dmaster_defer_call
*call
;
513 k
= ctdb_key_to_idkey(hdr
, key
);
515 DEBUG(DEBUG_ERR
, ("Failed to allocate key for dmaster defer add\n"));
519 ddq
= trbt_lookuparray32(ctdb_db
->defer_dmaster
, k
[0], k
);
527 if (ddq
->generation
!= hdr
->generation
) {
528 talloc_set_destructor(ddq
, NULL
);
533 call
= talloc(ddq
, struct dmaster_defer_call
);
535 DEBUG(DEBUG_ERR
, ("Failed to allocate dmaster defer call\n"));
539 call
->ctdb
= ctdb_db
->ctdb
;
540 call
->hdr
= talloc_steal(call
, hdr
);
542 DLIST_ADD_END(ddq
->deferred_calls
, call
, NULL
);
548 called when a CTDB_REQ_DMASTER packet comes in
550 this comes into the lmaster for a record when the current dmaster
551 wants to give up the dmaster role and give it to someone else
553 void ctdb_request_dmaster(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
555 struct ctdb_req_dmaster
*c
= (struct ctdb_req_dmaster
*)hdr
;
556 TDB_DATA key
, data
, data2
;
557 struct ctdb_ltdb_header header
;
558 struct ctdb_db_context
*ctdb_db
;
559 uint32_t record_flags
= 0;
564 key
.dsize
= c
->keylen
;
565 data
.dptr
= c
->data
+ c
->keylen
;
566 data
.dsize
= c
->datalen
;
567 len
= offsetof(struct ctdb_req_dmaster
, data
) + key
.dsize
+ data
.dsize
569 if (len
<= c
->hdr
.length
) {
570 memcpy(&record_flags
, &c
->data
[c
->keylen
+ c
->datalen
],
571 sizeof(record_flags
));
574 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
576 ctdb_send_error(ctdb
, hdr
, -1,
577 "Unknown database in request. db_id==0x%08x",
582 dmaster_defer_setup(ctdb_db
, hdr
, key
);
584 /* fetch the current record */
585 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, key
, &header
, hdr
, &data2
,
586 ctdb_call_input_pkt
, ctdb
, false);
588 ctdb_fatal(ctdb
, "ctdb_req_dmaster failed to fetch record");
592 DEBUG(DEBUG_INFO
,(__location__
" deferring ctdb_request_dmaster\n"));
596 if (ctdb_lmaster(ctdb
, &key
) != ctdb
->pnn
) {
597 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
598 ctdb
->pnn
, ctdb_lmaster(ctdb
, &key
),
599 hdr
->generation
, ctdb
->vnn_map
->generation
));
600 ctdb_fatal(ctdb
, "ctdb_req_dmaster to non-lmaster");
603 DEBUG(DEBUG_DEBUG
,("pnn %u dmaster request on %08x for %u from %u\n",
604 ctdb
->pnn
, ctdb_hash(&key
), c
->dmaster
, c
->hdr
.srcnode
));
606 /* its a protocol error if the sending node is not the current dmaster */
607 if (header
.dmaster
!= hdr
->srcnode
) {
608 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
609 ctdb
->pnn
, c
->dmaster
, hdr
->srcnode
, header
.dmaster
, ctdb_hash(&key
),
610 ctdb_db
->db_id
, hdr
->generation
, ctdb
->vnn_map
->generation
,
611 (unsigned long long)c
->rsn
, (unsigned long long)header
.rsn
, c
->hdr
.reqid
,
612 (key
.dsize
>= 4)?(*(uint32_t *)key
.dptr
):0));
613 if (header
.rsn
!= 0 || header
.dmaster
!= ctdb
->pnn
) {
614 DEBUG(DEBUG_ERR
,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
616 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
617 ctdb_ltdb_unlock(ctdb_db
, key
);
622 if (header
.rsn
> c
->rsn
) {
623 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
624 ctdb
->pnn
, c
->dmaster
, hdr
->srcnode
, header
.dmaster
, ctdb_hash(&key
),
625 ctdb_db
->db_id
, hdr
->generation
, ctdb
->vnn_map
->generation
,
626 (unsigned long long)c
->rsn
, (unsigned long long)header
.rsn
, c
->hdr
.reqid
));
629 /* use the rsn from the sending node */
632 /* store the record flags from the sending node */
633 header
.flags
= record_flags
;
635 /* check if the new dmaster is the lmaster, in which case we
636 skip the dmaster reply */
637 if (c
->dmaster
== ctdb
->pnn
) {
638 ctdb_become_dmaster(ctdb_db
, hdr
, key
, data
, c
->rsn
, record_flags
);
640 ctdb_send_dmaster_reply(ctdb_db
, &header
, key
, data
, c
->dmaster
, hdr
->reqid
);
642 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
644 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
649 static void ctdb_sticky_record_timeout(struct event_context
*ev
, struct timed_event
*te
,
650 struct timeval t
, void *private_data
)
652 struct ctdb_sticky_record
*sr
= talloc_get_type(private_data
,
653 struct ctdb_sticky_record
);
657 static void *ctdb_make_sticky_record_callback(void *parm
, void *data
)
660 DEBUG(DEBUG_ERR
,("Already have sticky record registered. Free old %p and create new %p\n", data
, parm
));
667 ctdb_make_record_sticky(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
669 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
671 struct ctdb_sticky_record
*sr
;
673 k
= ctdb_key_to_idkey(tmp_ctx
, key
);
675 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
676 talloc_free(tmp_ctx
);
680 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
682 talloc_free(tmp_ctx
);
686 sr
= talloc(ctdb_db
->sticky_records
, struct ctdb_sticky_record
);
688 talloc_free(tmp_ctx
);
689 DEBUG(DEBUG_ERR
,("Failed to allocate sticky record structure\n"));
694 sr
->ctdb_db
= ctdb_db
;
697 DEBUG(DEBUG_ERR
,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
698 ctdb
->tunable
.sticky_duration
,
699 ctdb_db
->db_name
, ctdb_hash(&key
)));
701 trbt_insertarray32_callback(ctdb_db
->sticky_records
, k
[0], &k
[0], ctdb_make_sticky_record_callback
, sr
);
703 event_add_timed(ctdb
->ev
, sr
, timeval_current_ofs(ctdb
->tunable
.sticky_duration
, 0), ctdb_sticky_record_timeout
, sr
);
705 talloc_free(tmp_ctx
);
709 struct pinned_down_requeue_handle
{
710 struct ctdb_context
*ctdb
;
711 struct ctdb_req_header
*hdr
;
714 struct pinned_down_deferred_call
{
715 struct ctdb_context
*ctdb
;
716 struct ctdb_req_header
*hdr
;
719 static void pinned_down_requeue(struct event_context
*ev
, struct timed_event
*te
,
720 struct timeval t
, void *private_data
)
722 struct pinned_down_requeue_handle
*handle
= talloc_get_type(private_data
, struct pinned_down_requeue_handle
);
723 struct ctdb_context
*ctdb
= handle
->ctdb
;
725 talloc_steal(ctdb
, handle
->hdr
);
726 ctdb_call_input_pkt(ctdb
, handle
->hdr
);
731 static int pinned_down_destructor(struct pinned_down_deferred_call
*pinned_down
)
733 struct ctdb_context
*ctdb
= pinned_down
->ctdb
;
734 struct pinned_down_requeue_handle
*handle
= talloc(ctdb
, struct pinned_down_requeue_handle
);
736 handle
->ctdb
= pinned_down
->ctdb
;
737 handle
->hdr
= pinned_down
->hdr
;
738 talloc_steal(handle
, handle
->hdr
);
740 event_add_timed(ctdb
->ev
, handle
, timeval_zero(), pinned_down_requeue
, handle
);
746 ctdb_defer_pinned_down_request(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_req_header
*hdr
)
748 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
750 struct ctdb_sticky_record
*sr
;
751 struct pinned_down_deferred_call
*pinned_down
;
753 k
= ctdb_key_to_idkey(tmp_ctx
, key
);
755 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
756 talloc_free(tmp_ctx
);
760 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
762 talloc_free(tmp_ctx
);
766 talloc_free(tmp_ctx
);
768 if (sr
->pindown
== NULL
) {
772 pinned_down
= talloc(sr
->pindown
, struct pinned_down_deferred_call
);
773 if (pinned_down
== NULL
) {
774 DEBUG(DEBUG_ERR
,("Failed to allocate structure for deferred pinned down request\n"));
778 pinned_down
->ctdb
= ctdb
;
779 pinned_down
->hdr
= hdr
;
781 talloc_set_destructor(pinned_down
, pinned_down_destructor
);
782 talloc_steal(pinned_down
, hdr
);
788 ctdb_update_db_stat_hot_keys(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, int hopcount
)
792 /* smallest value is always at index 0 */
793 if (hopcount
<= ctdb_db
->statistics
.hot_keys
[0].count
) {
797 /* see if we already know this key */
798 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
799 if (key
.dsize
!= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
) {
802 if (memcmp(key
.dptr
, ctdb_db
->statistics
.hot_keys
[i
].key
.dptr
, key
.dsize
)) {
805 /* found an entry for this key */
806 if (hopcount
<= ctdb_db
->statistics
.hot_keys
[i
].count
) {
809 ctdb_db
->statistics
.hot_keys
[i
].count
= hopcount
;
813 if (ctdb_db
->statistics
.num_hot_keys
< MAX_HOT_KEYS
) {
814 id
= ctdb_db
->statistics
.num_hot_keys
;
815 ctdb_db
->statistics
.num_hot_keys
++;
820 if (ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
!= NULL
) {
821 talloc_free(ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
);
823 ctdb_db
->statistics
.hot_keys
[id
].key
.dsize
= key
.dsize
;
824 ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
= talloc_memdup(ctdb_db
, key
.dptr
, key
.dsize
);
825 ctdb_db
->statistics
.hot_keys
[id
].count
= hopcount
;
826 DEBUG(DEBUG_NOTICE
,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
827 ctdb_db
->db_name
, ctdb_hash(&key
), id
, hopcount
));
830 for (i
= 1; i
< MAX_HOT_KEYS
; i
++) {
831 if (ctdb_db
->statistics
.hot_keys
[i
].count
== 0) {
834 if (ctdb_db
->statistics
.hot_keys
[i
].count
< ctdb_db
->statistics
.hot_keys
[0].count
) {
835 hopcount
= ctdb_db
->statistics
.hot_keys
[i
].count
;
836 ctdb_db
->statistics
.hot_keys
[i
].count
= ctdb_db
->statistics
.hot_keys
[0].count
;
837 ctdb_db
->statistics
.hot_keys
[0].count
= hopcount
;
839 key
= ctdb_db
->statistics
.hot_keys
[i
].key
;
840 ctdb_db
->statistics
.hot_keys
[i
].key
= ctdb_db
->statistics
.hot_keys
[0].key
;
841 ctdb_db
->statistics
.hot_keys
[0].key
= key
;
847 called when a CTDB_REQ_CALL packet comes in
849 void ctdb_request_call(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
851 struct ctdb_req_call
*c
= (struct ctdb_req_call
*)hdr
;
853 struct ctdb_reply_call
*r
;
855 struct ctdb_ltdb_header header
;
856 struct ctdb_call
*call
;
857 struct ctdb_db_context
*ctdb_db
;
858 int tmp_count
, bucket
;
860 if (ctdb
->methods
== NULL
) {
861 DEBUG(DEBUG_INFO
,(__location__
" Failed ctdb_request_call. Transport is DOWN\n"));
866 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
868 ctdb_send_error(ctdb
, hdr
, -1,
869 "Unknown database in request. db_id==0x%08x",
874 call
= talloc(hdr
, struct ctdb_call
);
875 CTDB_NO_MEMORY_FATAL(ctdb
, call
);
877 call
->call_id
= c
->callid
;
878 call
->key
.dptr
= c
->data
;
879 call
->key
.dsize
= c
->keylen
;
880 call
->call_data
.dptr
= c
->data
+ c
->keylen
;
881 call
->call_data
.dsize
= c
->calldatalen
;
882 call
->reply_data
.dptr
= NULL
;
883 call
->reply_data
.dsize
= 0;
886 /* If this record is pinned down we should defer the
887 request until the pindown times out
889 if (ctdb_db
->sticky
) {
890 if (ctdb_defer_pinned_down_request(ctdb
, ctdb_db
, call
->key
, hdr
) == 0) {
892 ("Defer request for pinned down record in %s\n", ctdb_db
->db_name
));
898 if (dmaster_defer_add(ctdb_db
, hdr
, call
->key
) == 0) {
903 /* determine if we are the dmaster for this key. This also
904 fetches the record data (if any), thus avoiding a 2nd fetch of the data
905 if the call will be answered locally */
907 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, call
->key
, &header
, hdr
, &data
,
908 ctdb_call_input_pkt
, ctdb
, false);
910 ctdb_send_error(ctdb
, hdr
, ret
, "ltdb fetch failed in ctdb_request_call");
915 DEBUG(DEBUG_INFO
,(__location__
" deferred ctdb_request_call\n"));
920 /* Dont do READONLY if we dont have a tracking database */
921 if ((c
->flags
& CTDB_WANT_READONLY
) && !ctdb_db
->readonly
) {
922 c
->flags
&= ~CTDB_WANT_READONLY
;
925 if (header
.flags
& CTDB_REC_RO_REVOKE_COMPLETE
) {
926 header
.flags
&= ~CTDB_REC_RO_FLAGS
;
927 CTDB_INCREMENT_STAT(ctdb
, total_ro_revokes
);
928 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_revokes
);
929 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
930 ctdb_fatal(ctdb
, "Failed to write header with cleared REVOKE flag");
932 /* and clear out the tracking data */
933 if (tdb_delete(ctdb_db
->rottdb
, call
->key
) != 0) {
934 DEBUG(DEBUG_ERR
,(__location__
" Failed to clear out trackingdb record\n"));
938 /* if we are revoking, we must defer all other calls until the revoke
941 if (header
.flags
& CTDB_REC_RO_REVOKING_READONLY
) {
942 talloc_free(data
.dptr
);
943 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
945 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, call
->key
, hdr
, ctdb_call_input_pkt
, ctdb
) != 0) {
946 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
953 * If we are not the dmaster and are not hosting any delegations,
954 * then we redirect the request to the node than can answer it
955 * (the lmaster or the dmaster).
957 if ((header
.dmaster
!= ctdb
->pnn
)
958 && (!(header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
)) ) {
959 talloc_free(data
.dptr
);
960 ctdb_call_send_redirect(ctdb
, ctdb_db
, call
->key
, c
, &header
);
962 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
964 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
970 if ( (!(c
->flags
& CTDB_WANT_READONLY
))
971 && (header
.flags
& (CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
)) ) {
972 header
.flags
|= CTDB_REC_RO_REVOKING_READONLY
;
973 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
974 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
976 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
978 if (ctdb_start_revoke_ro_record(ctdb
, ctdb_db
, call
->key
, &header
, data
) != 0) {
979 ctdb_fatal(ctdb
, "Failed to start record revoke");
981 talloc_free(data
.dptr
);
983 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, call
->key
, hdr
, ctdb_call_input_pkt
, ctdb
) != 0) {
984 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
991 /* If this is the first request for delegation. bump rsn and set
992 * the delegations flag
994 if ((c
->flags
& CTDB_WANT_READONLY
)
995 && (c
->callid
== CTDB_FETCH_WITH_HEADER_FUNC
)
996 && (!(header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
))) {
998 header
.flags
|= CTDB_REC_RO_HAVE_DELEGATIONS
;
999 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
1000 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
1003 if ((c
->flags
& CTDB_WANT_READONLY
)
1004 && (call
->call_id
== CTDB_FETCH_WITH_HEADER_FUNC
)) {
1007 tdata
= tdb_fetch(ctdb_db
->rottdb
, call
->key
);
1008 if (ctdb_trackingdb_add_pnn(ctdb
, &tdata
, c
->hdr
.srcnode
) != 0) {
1009 ctdb_fatal(ctdb
, "Failed to add node to trackingdb");
1011 if (tdb_store(ctdb_db
->rottdb
, call
->key
, tdata
, TDB_REPLACE
) != 0) {
1012 ctdb_fatal(ctdb
, "Failed to store trackingdb data");
1016 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
1018 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
1021 len
= offsetof(struct ctdb_reply_call
, data
) + data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
1022 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REPLY_CALL
, len
,
1023 struct ctdb_reply_call
);
1024 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
1025 r
->hdr
.destnode
= c
->hdr
.srcnode
;
1026 r
->hdr
.reqid
= c
->hdr
.reqid
;
1028 r
->datalen
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
1030 header
.flags
|= CTDB_REC_RO_HAVE_READONLY
;
1031 header
.flags
&= ~CTDB_REC_RO_HAVE_DELEGATIONS
;
1032 memcpy(&r
->data
[0], &header
, sizeof(struct ctdb_ltdb_header
));
1035 memcpy(&r
->data
[sizeof(struct ctdb_ltdb_header
)], data
.dptr
, data
.dsize
);
1038 ctdb_queue_packet(ctdb
, &r
->hdr
);
1039 CTDB_INCREMENT_STAT(ctdb
, total_ro_delegations
);
1040 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_delegations
);
1047 CTDB_UPDATE_STAT(ctdb
, max_hop_count
, c
->hopcount
);
1048 tmp_count
= c
->hopcount
;
1054 if (bucket
>= MAX_COUNT_BUCKETS
) {
1055 bucket
= MAX_COUNT_BUCKETS
- 1;
1057 CTDB_INCREMENT_STAT(ctdb
, hop_count_bucket
[bucket
]);
1058 CTDB_INCREMENT_DB_STAT(ctdb_db
, hop_count_bucket
[bucket
]);
1059 ctdb_update_db_stat_hot_keys(ctdb_db
, call
->key
, c
->hopcount
);
1061 /* If this database supports sticky records, then check if the
1062 hopcount is big. If it is it means the record is hot and we
1063 should make it sticky.
1065 if (ctdb_db
->sticky
&& c
->hopcount
>= ctdb
->tunable
.hopcount_make_sticky
) {
1066 ctdb_make_record_sticky(ctdb
, ctdb_db
, call
->key
);
1070 /* Try if possible to migrate the record off to the caller node.
1071 * From the clients perspective a fetch of the data is just as
1072 * expensive as a migration.
1074 if (c
->hdr
.srcnode
!= ctdb
->pnn
) {
1075 if (ctdb_db
->persistent_state
) {
1076 DEBUG(DEBUG_INFO
, (__location__
" refusing migration"
1077 " of key %s while transaction is active\n",
1078 (char *)call
->key
.dptr
));
1080 DEBUG(DEBUG_DEBUG
,("pnn %u starting migration of %08x to %u\n",
1081 ctdb
->pnn
, ctdb_hash(&(call
->key
)), c
->hdr
.srcnode
));
1082 ctdb_call_send_dmaster(ctdb_db
, c
, &header
, &(call
->key
), &data
);
1083 talloc_free(data
.dptr
);
1085 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
1087 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
1094 ret
= ctdb_call_local(ctdb_db
, call
, &header
, hdr
, &data
, true);
1096 DEBUG(DEBUG_ERR
,(__location__
" ctdb_call_local failed\n"));
1100 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
1102 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
1105 len
= offsetof(struct ctdb_reply_call
, data
) + call
->reply_data
.dsize
;
1106 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REPLY_CALL
, len
,
1107 struct ctdb_reply_call
);
1108 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
1109 r
->hdr
.destnode
= hdr
->srcnode
;
1110 r
->hdr
.reqid
= hdr
->reqid
;
1111 r
->status
= call
->status
;
1112 r
->datalen
= call
->reply_data
.dsize
;
1113 if (call
->reply_data
.dsize
) {
1114 memcpy(&r
->data
[0], call
->reply_data
.dptr
, call
->reply_data
.dsize
);
1117 ctdb_queue_packet(ctdb
, &r
->hdr
);
1124 * called when a CTDB_REPLY_CALL packet comes in
1126 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1127 * contains any reply data from the call
1129 void ctdb_reply_call(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1131 struct ctdb_reply_call
*c
= (struct ctdb_reply_call
*)hdr
;
1132 struct ctdb_call_state
*state
;
1134 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
1135 if (state
== NULL
) {
1136 DEBUG(DEBUG_ERR
, (__location__
" reqid %u not found\n", hdr
->reqid
));
1140 if (hdr
->reqid
!= state
->reqid
) {
1141 /* we found a record but it was the wrong one */
1142 DEBUG(DEBUG_ERR
, ("Dropped orphaned call reply with reqid:%u\n",hdr
->reqid
));
1147 /* read only delegation processing */
1148 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1149 * delegation since we may need to update the record header
1151 if (state
->c
->callid
== CTDB_FETCH_WITH_HEADER_FUNC
) {
1152 struct ctdb_db_context
*ctdb_db
= state
->ctdb_db
;
1153 struct ctdb_ltdb_header
*header
= (struct ctdb_ltdb_header
*)&c
->data
[0];
1154 struct ctdb_ltdb_header oldheader
;
1155 TDB_DATA key
, data
, olddata
;
1158 if (!(header
->flags
& CTDB_REC_RO_HAVE_READONLY
)) {
1163 key
.dsize
= state
->c
->keylen
;
1164 key
.dptr
= state
->c
->data
;
1165 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
,
1166 ctdb_call_input_pkt
, ctdb
, false);
1171 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock in ctdb_reply_call\n"));
1175 ret
= ctdb_ltdb_fetch(ctdb_db
, key
, &oldheader
, state
, &olddata
);
1177 DEBUG(DEBUG_ERR
, ("Failed to fetch old record in ctdb_reply_call\n"));
1178 ctdb_ltdb_unlock(ctdb_db
, key
);
1182 if (header
->rsn
<= oldheader
.rsn
) {
1183 ctdb_ltdb_unlock(ctdb_db
, key
);
1187 if (c
->datalen
< sizeof(struct ctdb_ltdb_header
)) {
1188 DEBUG(DEBUG_ERR
,(__location__
" Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c
->datalen
));
1189 ctdb_ltdb_unlock(ctdb_db
, key
);
1193 data
.dsize
= c
->datalen
- sizeof(struct ctdb_ltdb_header
);
1194 data
.dptr
= &c
->data
[sizeof(struct ctdb_ltdb_header
)];
1195 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, data
);
1197 DEBUG(DEBUG_ERR
, ("Failed to store new record in ctdb_reply_call\n"));
1198 ctdb_ltdb_unlock(ctdb_db
, key
);
1202 ctdb_ltdb_unlock(ctdb_db
, key
);
1206 state
->call
->reply_data
.dptr
= c
->data
;
1207 state
->call
->reply_data
.dsize
= c
->datalen
;
1208 state
->call
->status
= c
->status
;
1210 talloc_steal(state
, c
);
1212 state
->state
= CTDB_CALL_DONE
;
1213 if (state
->async
.fn
) {
1214 state
->async
.fn(state
);
1220 * called when a CTDB_REPLY_DMASTER packet comes in
1222 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1223 * request packet. It means that the current dmaster wants to give us
1226 void ctdb_reply_dmaster(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1228 struct ctdb_reply_dmaster
*c
= (struct ctdb_reply_dmaster
*)hdr
;
1229 struct ctdb_db_context
*ctdb_db
;
1231 uint32_t record_flags
= 0;
1235 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
1236 if (ctdb_db
== NULL
) {
1237 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c
->db_id
));
1242 key
.dsize
= c
->keylen
;
1243 data
.dptr
= &c
->data
[key
.dsize
];
1244 data
.dsize
= c
->datalen
;
1245 len
= offsetof(struct ctdb_reply_dmaster
, data
) + key
.dsize
+ data
.dsize
1247 if (len
<= c
->hdr
.length
) {
1248 memcpy(&record_flags
, &c
->data
[c
->keylen
+ c
->datalen
],
1249 sizeof(record_flags
));
1252 dmaster_defer_setup(ctdb_db
, hdr
, key
);
1254 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
,
1255 ctdb_call_input_pkt
, ctdb
, false);
1260 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock in ctdb_reply_dmaster\n"));
1264 ctdb_become_dmaster(ctdb_db
, hdr
, key
, data
, c
->rsn
, record_flags
);
1269 called when a CTDB_REPLY_ERROR packet comes in
1271 void ctdb_reply_error(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1273 struct ctdb_reply_error
*c
= (struct ctdb_reply_error
*)hdr
;
1274 struct ctdb_call_state
*state
;
1276 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
1277 if (state
== NULL
) {
1278 DEBUG(DEBUG_ERR
,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1279 ctdb
->pnn
, hdr
->reqid
));
1283 if (hdr
->reqid
!= state
->reqid
) {
1284 /* we found a record but it was the wrong one */
1285 DEBUG(DEBUG_ERR
, ("Dropped orphaned error reply with reqid:%u\n",hdr
->reqid
));
1289 talloc_steal(state
, c
);
1291 state
->state
= CTDB_CALL_ERROR
;
1292 state
->errmsg
= (char *)c
->msg
;
1293 if (state
->async
.fn
) {
1294 state
->async
.fn(state
);
1302 static int ctdb_call_destructor(struct ctdb_call_state
*state
)
1304 DLIST_REMOVE(state
->ctdb_db
->ctdb
->pending_calls
, state
);
1305 ctdb_reqid_remove(state
->ctdb_db
->ctdb
, state
->reqid
);
1311 called when a ctdb_call needs to be resent after a reconfigure event
1313 static void ctdb_call_resend(struct ctdb_call_state
*state
)
1315 struct ctdb_context
*ctdb
= state
->ctdb_db
->ctdb
;
1317 state
->generation
= ctdb
->vnn_map
->generation
;
1319 /* use a new reqid, in case the old reply does eventually come in */
1320 ctdb_reqid_remove(ctdb
, state
->reqid
);
1321 state
->reqid
= ctdb_reqid_new(ctdb
, state
);
1322 state
->c
->hdr
.reqid
= state
->reqid
;
1324 /* update the generation count for this request, so its valid with the new vnn_map */
1325 state
->c
->hdr
.generation
= state
->generation
;
1327 /* send the packet to ourselves, it will be redirected appropriately */
1328 state
->c
->hdr
.destnode
= ctdb
->pnn
;
1330 ctdb_queue_packet(ctdb
, &state
->c
->hdr
);
1331 DEBUG(DEBUG_NOTICE
,("resent ctdb_call\n"));
1335 resend all pending calls on recovery
1337 void ctdb_call_resend_all(struct ctdb_context
*ctdb
)
1339 struct ctdb_call_state
*state
, *next
;
1340 for (state
=ctdb
->pending_calls
;state
;state
=next
) {
1342 ctdb_call_resend(state
);
1347 this allows the caller to setup a async.fn
1349 static void call_local_trigger(struct event_context
*ev
, struct timed_event
*te
,
1350 struct timeval t
, void *private_data
)
1352 struct ctdb_call_state
*state
= talloc_get_type(private_data
, struct ctdb_call_state
);
1353 if (state
->async
.fn
) {
1354 state
->async
.fn(state
);
1360 construct an event driven local ctdb_call
1362 this is used so that locally processed ctdb_call requests are processed
1363 in an event driven manner
1365 struct ctdb_call_state
*ctdb_call_local_send(struct ctdb_db_context
*ctdb_db
,
1366 struct ctdb_call
*call
,
1367 struct ctdb_ltdb_header
*header
,
1370 struct ctdb_call_state
*state
;
1371 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1374 state
= talloc_zero(ctdb_db
, struct ctdb_call_state
);
1375 CTDB_NO_MEMORY_NULL(ctdb
, state
);
1377 talloc_steal(state
, data
->dptr
);
1379 state
->state
= CTDB_CALL_DONE
;
1380 state
->call
= talloc(state
, struct ctdb_call
);
1381 CTDB_NO_MEMORY_NULL(ctdb
, state
->call
);
1382 *(state
->call
) = *call
;
1383 state
->ctdb_db
= ctdb_db
;
1385 ret
= ctdb_call_local(ctdb_db
, state
->call
, header
, state
, data
, true);
1387 DEBUG(DEBUG_DEBUG
,("ctdb_call_local() failed, ignoring return code %d\n", ret
));
1390 event_add_timed(ctdb
->ev
, state
, timeval_zero(), call_local_trigger
, state
);
1397 make a remote ctdb call - async send. Called in daemon context.
1399 This constructs a ctdb_call request and queues it for processing.
1400 This call never blocks.
1402 struct ctdb_call_state
*ctdb_daemon_call_send_remote(struct ctdb_db_context
*ctdb_db
,
1403 struct ctdb_call
*call
,
1404 struct ctdb_ltdb_header
*header
)
1407 struct ctdb_call_state
*state
;
1408 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1410 if (ctdb
->methods
== NULL
) {
1411 DEBUG(DEBUG_INFO
,(__location__
" Failed send packet. Transport is down\n"));
1415 state
= talloc_zero(ctdb_db
, struct ctdb_call_state
);
1416 CTDB_NO_MEMORY_NULL(ctdb
, state
);
1417 state
->call
= talloc(state
, struct ctdb_call
);
1418 CTDB_NO_MEMORY_NULL(ctdb
, state
->call
);
1420 state
->reqid
= ctdb_reqid_new(ctdb
, state
);
1421 state
->ctdb_db
= ctdb_db
;
1422 talloc_set_destructor(state
, ctdb_call_destructor
);
1424 len
= offsetof(struct ctdb_req_call
, data
) + call
->key
.dsize
+ call
->call_data
.dsize
;
1425 state
->c
= ctdb_transport_allocate(ctdb
, state
, CTDB_REQ_CALL
, len
,
1426 struct ctdb_req_call
);
1427 CTDB_NO_MEMORY_NULL(ctdb
, state
->c
);
1428 state
->c
->hdr
.destnode
= header
->dmaster
;
1430 /* this limits us to 16k outstanding messages - not unreasonable */
1431 state
->c
->hdr
.reqid
= state
->reqid
;
1432 state
->c
->flags
= call
->flags
;
1433 state
->c
->db_id
= ctdb_db
->db_id
;
1434 state
->c
->callid
= call
->call_id
;
1435 state
->c
->hopcount
= 0;
1436 state
->c
->keylen
= call
->key
.dsize
;
1437 state
->c
->calldatalen
= call
->call_data
.dsize
;
1438 memcpy(&state
->c
->data
[0], call
->key
.dptr
, call
->key
.dsize
);
1439 memcpy(&state
->c
->data
[call
->key
.dsize
],
1440 call
->call_data
.dptr
, call
->call_data
.dsize
);
1441 *(state
->call
) = *call
;
1442 state
->call
->call_data
.dptr
= &state
->c
->data
[call
->key
.dsize
];
1443 state
->call
->key
.dptr
= &state
->c
->data
[0];
1445 state
->state
= CTDB_CALL_WAIT
;
1446 state
->generation
= ctdb
->vnn_map
->generation
;
1448 DLIST_ADD(ctdb
->pending_calls
, state
);
1450 ctdb_queue_packet(ctdb
, &state
->c
->hdr
);
1456 make a remote ctdb call - async recv - called in daemon context
1458 This is called when the program wants to wait for a ctdb_call to complete and get the
1459 results. This call will block unless the call has already completed.
1461 int ctdb_daemon_call_recv(struct ctdb_call_state
*state
, struct ctdb_call
*call
)
1463 while (state
->state
< CTDB_CALL_DONE
) {
1464 event_loop_once(state
->ctdb_db
->ctdb
->ev
);
1466 if (state
->state
!= CTDB_CALL_DONE
) {
1467 ctdb_set_error(state
->ctdb_db
->ctdb
, "%s", state
->errmsg
);
1472 if (state
->call
->reply_data
.dsize
) {
1473 call
->reply_data
.dptr
= talloc_memdup(call
,
1474 state
->call
->reply_data
.dptr
,
1475 state
->call
->reply_data
.dsize
);
1476 call
->reply_data
.dsize
= state
->call
->reply_data
.dsize
;
1478 call
->reply_data
.dptr
= NULL
;
1479 call
->reply_data
.dsize
= 0;
1481 call
->status
= state
->call
->status
;
1488 send a keepalive packet to the other node
1490 void ctdb_send_keepalive(struct ctdb_context
*ctdb
, uint32_t destnode
)
1492 struct ctdb_req_keepalive
*r
;
1494 if (ctdb
->methods
== NULL
) {
1495 DEBUG(DEBUG_INFO
,(__location__
" Failed to send keepalive. Transport is DOWN\n"));
1499 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_KEEPALIVE
,
1500 sizeof(struct ctdb_req_keepalive
),
1501 struct ctdb_req_keepalive
);
1502 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
1503 r
->hdr
.destnode
= destnode
;
1506 CTDB_INCREMENT_STAT(ctdb
, keepalive_packets_sent
);
1508 ctdb_queue_packet(ctdb
, &r
->hdr
);
1515 struct revokechild_deferred_call
{
1516 struct ctdb_context
*ctdb
;
1517 struct ctdb_req_header
*hdr
;
1518 deferred_requeue_fn fn
;
1522 struct revokechild_handle
{
1523 struct revokechild_handle
*next
, *prev
;
1524 struct ctdb_context
*ctdb
;
1525 struct ctdb_db_context
*ctdb_db
;
1526 struct fd_event
*fde
;
1533 struct revokechild_requeue_handle
{
1534 struct ctdb_context
*ctdb
;
1535 struct ctdb_req_header
*hdr
;
1536 deferred_requeue_fn fn
;
1540 static void deferred_call_requeue(struct event_context
*ev
, struct timed_event
*te
,
1541 struct timeval t
, void *private_data
)
1543 struct revokechild_requeue_handle
*requeue_handle
= talloc_get_type(private_data
, struct revokechild_requeue_handle
);
1545 requeue_handle
->fn(requeue_handle
->ctx
, requeue_handle
->hdr
);
1546 talloc_free(requeue_handle
);
1549 static int deferred_call_destructor(struct revokechild_deferred_call
*deferred_call
)
1551 struct ctdb_context
*ctdb
= deferred_call
->ctdb
;
1552 struct revokechild_requeue_handle
*requeue_handle
= talloc(ctdb
, struct revokechild_requeue_handle
);
1553 struct ctdb_req_call
*c
= (struct ctdb_req_call
*)deferred_call
->hdr
;
1555 requeue_handle
->ctdb
= ctdb
;
1556 requeue_handle
->hdr
= deferred_call
->hdr
;
1557 requeue_handle
->fn
= deferred_call
->fn
;
1558 requeue_handle
->ctx
= deferred_call
->ctx
;
1559 talloc_steal(requeue_handle
, requeue_handle
->hdr
);
1561 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1562 event_add_timed(ctdb
->ev
, requeue_handle
, timeval_current_ofs(c
->flags
& CTDB_WANT_READONLY
? 1 : 0, 0), deferred_call_requeue
, requeue_handle
);
1568 static int revokechild_destructor(struct revokechild_handle
*rc
)
1570 if (rc
->fde
!= NULL
) {
1571 talloc_free(rc
->fde
);
1574 if (rc
->fd
[0] != -1) {
1577 if (rc
->fd
[1] != -1) {
1580 ctdb_kill(rc
->ctdb
, rc
->child
, SIGKILL
);
1582 DLIST_REMOVE(rc
->ctdb_db
->revokechild_active
, rc
);
1586 static void revokechild_handler(struct event_context
*ev
, struct fd_event
*fde
,
1587 uint16_t flags
, void *private_data
)
1589 struct revokechild_handle
*rc
= talloc_get_type(private_data
,
1590 struct revokechild_handle
);
1594 ret
= sys_read(rc
->fd
[0], &c
, 1);
1596 DEBUG(DEBUG_ERR
,("Failed to read status from revokechild. errno:%d\n", errno
));
1602 DEBUG(DEBUG_ERR
,("revokechild returned failure. status:%d\n", c
));
1611 struct ctdb_revoke_state
{
1612 struct ctdb_db_context
*ctdb_db
;
1614 struct ctdb_ltdb_header
*header
;
1621 static void update_record_cb(struct ctdb_client_control_state
*state
)
1623 struct ctdb_revoke_state
*revoke_state
;
1627 if (state
== NULL
) {
1630 revoke_state
= state
->async
.private_data
;
1632 state
->async
.fn
= NULL
;
1633 ret
= ctdb_control_recv(state
->ctdb
, state
, state
, NULL
, &res
, NULL
);
1634 if ((ret
!= 0) || (res
!= 0)) {
1635 DEBUG(DEBUG_ERR
,("Recv for revoke update record failed ret:%d res:%d\n", ret
, res
));
1636 revoke_state
->status
= -1;
1639 revoke_state
->count
--;
1640 if (revoke_state
->count
<= 0) {
1641 revoke_state
->finished
= 1;
1645 static void revoke_send_cb(struct ctdb_context
*ctdb
, uint32_t pnn
, void *private_data
)
1647 struct ctdb_revoke_state
*revoke_state
= private_data
;
1648 struct ctdb_client_control_state
*state
;
1650 state
= ctdb_ctrl_updaterecord_send(ctdb
, revoke_state
, timeval_current_ofs(ctdb
->tunable
.control_timeout
,0), pnn
, revoke_state
->ctdb_db
, revoke_state
->key
, revoke_state
->header
, revoke_state
->data
);
1651 if (state
== NULL
) {
1652 DEBUG(DEBUG_ERR
,("Failure to send update record to revoke readonly delegation\n"));
1653 revoke_state
->status
= -1;
1656 state
->async
.fn
= update_record_cb
;
1657 state
->async
.private_data
= revoke_state
;
1659 revoke_state
->count
++;
1663 static void ctdb_revoke_timeout_handler(struct event_context
*ev
, struct timed_event
*te
,
1664 struct timeval yt
, void *private_data
)
1666 struct ctdb_revoke_state
*state
= private_data
;
1668 DEBUG(DEBUG_ERR
,("Timed out waiting for revoke to finish\n"));
1669 state
->finished
= 1;
1673 static int ctdb_revoke_all_delegations(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA tdata
, TDB_DATA key
, struct ctdb_ltdb_header
*header
, TDB_DATA data
)
1675 struct ctdb_revoke_state
*state
= talloc_zero(ctdb
, struct ctdb_revoke_state
);
1676 struct ctdb_ltdb_header new_header
;
1679 state
->ctdb_db
= ctdb_db
;
1681 state
->header
= header
;
1684 ctdb_trackingdb_traverse(ctdb
, tdata
, revoke_send_cb
, state
);
1686 event_add_timed(ctdb
->ev
, state
, timeval_current_ofs(ctdb
->tunable
.control_timeout
, 0), ctdb_revoke_timeout_handler
, state
);
1688 while (state
->finished
== 0) {
1689 event_loop_once(ctdb
->ev
);
1692 if (ctdb_ltdb_lock(ctdb_db
, key
) != 0) {
1693 DEBUG(DEBUG_ERR
,("Failed to chainlock the database in revokechild\n"));
1697 if (ctdb_ltdb_fetch(ctdb_db
, key
, &new_header
, state
, &new_data
) != 0) {
1698 ctdb_ltdb_unlock(ctdb_db
, key
);
1699 DEBUG(DEBUG_ERR
,("Failed for fetch tdb record in revokechild\n"));
1704 if (new_header
.rsn
> header
->rsn
) {
1705 ctdb_ltdb_unlock(ctdb_db
, key
);
1706 DEBUG(DEBUG_ERR
,("RSN too high in tdb record in revokechild\n"));
1710 if ( (new_header
.flags
& (CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
)) != (CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
) ) {
1711 ctdb_ltdb_unlock(ctdb_db
, key
);
1712 DEBUG(DEBUG_ERR
,("Flags are wrong in tdb record in revokechild\n"));
1718 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1719 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1721 if (state
->status
== 0) {
1723 new_header
.flags
|= CTDB_REC_RO_REVOKE_COMPLETE
;
1725 DEBUG(DEBUG_NOTICE
, ("Revoke all delegations failed, retrying.\n"));
1726 new_header
.flags
&= ~CTDB_REC_RO_REVOKING_READONLY
;
1728 if (ctdb_ltdb_store(ctdb_db
, key
, &new_header
, new_data
) != 0) {
1729 ctdb_ltdb_unlock(ctdb_db
, key
);
1730 DEBUG(DEBUG_ERR
,("Failed to write new record in revokechild\n"));
1734 ctdb_ltdb_unlock(ctdb_db
, key
);
1741 int ctdb_start_revoke_ro_record(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_ltdb_header
*header
, TDB_DATA data
)
1744 struct revokechild_handle
*rc
;
1745 pid_t parent
= getpid();
1748 header
->flags
&= ~(CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
);
1749 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
1752 if ((rc
= talloc_zero(ctdb_db
, struct revokechild_handle
)) == NULL
) {
1753 DEBUG(DEBUG_ERR
,("Failed to allocate revokechild_handle\n"));
1757 tdata
= tdb_fetch(ctdb_db
->rottdb
, key
);
1758 if (tdata
.dsize
> 0) {
1762 tdata
.dptr
= talloc_memdup(rc
, tdata
.dptr
, tdata
.dsize
);
1768 rc
->ctdb_db
= ctdb_db
;
1772 talloc_set_destructor(rc
, revokechild_destructor
);
1774 rc
->key
.dsize
= key
.dsize
;
1775 rc
->key
.dptr
= talloc_memdup(rc
, key
.dptr
, key
.dsize
);
1776 if (rc
->key
.dptr
== NULL
) {
1777 DEBUG(DEBUG_ERR
,("Failed to allocate key for revokechild_handle\n"));
1784 DEBUG(DEBUG_ERR
,("Failed to allocate key for revokechild_handle\n"));
1790 rc
->child
= ctdb_fork(ctdb
);
1791 if (rc
->child
== (pid_t
)-1) {
1792 DEBUG(DEBUG_ERR
,("Failed to fork child for revokechild\n"));
1797 if (rc
->child
== 0) {
1800 debug_extra
= talloc_asprintf(NULL
, "revokechild-%s:", ctdb_db
->db_name
);
1802 ctdb_set_process_name("ctdb_revokechild");
1803 if (switch_from_server_to_client(ctdb
, "revokechild-%s", ctdb_db
->db_name
) != 0) {
1804 DEBUG(DEBUG_ERR
,("Failed to switch from server to client for revokechild process\n"));
1806 goto child_finished
;
1809 c
= ctdb_revoke_all_delegations(ctdb
, ctdb_db
, tdata
, key
, header
, data
);
1812 sys_write(rc
->fd
[1], &c
, 1);
1813 /* make sure we die when our parent dies */
1814 while (ctdb_kill(ctdb
, parent
, 0) == 0 || errno
!= ESRCH
) {
1822 set_close_on_exec(rc
->fd
[0]);
1824 /* This is an active revokechild child process */
1825 DLIST_ADD_END(ctdb_db
->revokechild_active
, rc
, NULL
);
1827 rc
->fde
= event_add_fd(ctdb
->ev
, rc
, rc
->fd
[0],
1828 EVENT_FD_READ
, revokechild_handler
,
1830 if (rc
->fde
== NULL
) {
1831 DEBUG(DEBUG_ERR
,("Failed to set up fd event for revokechild process\n"));
1834 tevent_fd_set_auto_close(rc
->fde
);
1839 int ctdb_add_revoke_deferred_call(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_req_header
*hdr
, deferred_requeue_fn fn
, void *call_context
)
1841 struct revokechild_handle
*rc
;
1842 struct revokechild_deferred_call
*deferred_call
;
1844 for (rc
= ctdb_db
->revokechild_active
; rc
; rc
= rc
->next
) {
1845 if (rc
->key
.dsize
== 0) {
1848 if (rc
->key
.dsize
!= key
.dsize
) {
1851 if (!memcmp(rc
->key
.dptr
, key
.dptr
, key
.dsize
)) {
1857 DEBUG(DEBUG_ERR
,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1861 deferred_call
= talloc(rc
, struct revokechild_deferred_call
);
1862 if (deferred_call
== NULL
) {
1863 DEBUG(DEBUG_ERR
,("Failed to allocate deferred call structure for revoking record\n"));
1867 deferred_call
->ctdb
= ctdb
;
1868 deferred_call
->hdr
= hdr
;
1869 deferred_call
->fn
= fn
;
1870 deferred_call
->ctx
= call_context
;
1872 talloc_set_destructor(deferred_call
, deferred_call_destructor
);
1873 talloc_steal(deferred_call
, hdr
);