2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
24 #include "system/network.h"
25 #include "system/filesys.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/util_process.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 struct ctdb_sticky_record
{
45 struct ctdb_context
*ctdb
;
46 struct ctdb_db_context
*ctdb_db
;
51 find the ctdb_db from a db index
53 struct ctdb_db_context
*find_ctdb_db(struct ctdb_context
*ctdb
, uint32_t id
)
55 struct ctdb_db_context
*ctdb_db
;
57 for (ctdb_db
=ctdb
->db_list
; ctdb_db
; ctdb_db
=ctdb_db
->next
) {
58 if (ctdb_db
->db_id
== id
) {
66 a varient of input packet that can be used in lock requeue
68 static void ctdb_call_input_pkt(void *p
, struct ctdb_req_header
*hdr
)
70 struct ctdb_context
*ctdb
= talloc_get_type(p
, struct ctdb_context
);
71 ctdb_input_pkt(ctdb
, hdr
);
78 static void ctdb_send_error(struct ctdb_context
*ctdb
,
79 struct ctdb_req_header
*hdr
, uint32_t status
,
80 const char *fmt
, ...) PRINTF_ATTRIBUTE(4,5);
81 static void ctdb_send_error(struct ctdb_context
*ctdb
,
82 struct ctdb_req_header
*hdr
, uint32_t status
,
86 struct ctdb_reply_error_old
*r
;
90 if (ctdb
->methods
== NULL
) {
91 DEBUG(DEBUG_INFO
,(__location__
" Failed to send error. Transport is DOWN\n"));
96 msg
= talloc_vasprintf(ctdb
, fmt
, ap
);
98 ctdb_fatal(ctdb
, "Unable to allocate error in ctdb_send_error\n");
102 msglen
= strlen(msg
)+1;
103 len
= offsetof(struct ctdb_reply_error_old
, msg
);
104 r
= ctdb_transport_allocate(ctdb
, msg
, CTDB_REPLY_ERROR
, len
+ msglen
,
105 struct ctdb_reply_error_old
);
106 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
108 r
->hdr
.destnode
= hdr
->srcnode
;
109 r
->hdr
.reqid
= hdr
->reqid
;
112 memcpy(&r
->msg
[0], msg
, msglen
);
114 ctdb_queue_packet(ctdb
, &r
->hdr
);
121 * send a redirect reply
123 * The logic behind this function is this:
125 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
126 * to its local ctdb (ctdb_request_call). If the node is not itself
127 * the record's DMASTER, it first redirects the packet to the
128 * record's LMASTER. The LMASTER then redirects the call packet to
129 * the current DMASTER. Note that this works because of this: When
130 * a record is migrated off a node, then the new DMASTER is stored
131 * in the record's copy on the former DMASTER.
133 static void ctdb_call_send_redirect(struct ctdb_context
*ctdb
,
134 struct ctdb_db_context
*ctdb_db
,
136 struct ctdb_req_call_old
*c
,
137 struct ctdb_ltdb_header
*header
)
139 uint32_t lmaster
= ctdb_lmaster(ctdb
, &key
);
141 c
->hdr
.destnode
= lmaster
;
142 if (ctdb
->pnn
== lmaster
) {
143 c
->hdr
.destnode
= header
->dmaster
;
147 if (c
->hopcount
%100 > 95) {
148 DEBUG(DEBUG_WARNING
,("High hopcount %d dbid:%s "
149 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
150 "header->dmaster:%d dst:%d\n",
151 c
->hopcount
, ctdb_db
->db_name
, ctdb_hash(&key
),
152 c
->hdr
.reqid
, ctdb
->pnn
, c
->hdr
.srcnode
, lmaster
,
153 header
->dmaster
, c
->hdr
.destnode
));
156 ctdb_queue_packet(ctdb
, &c
->hdr
);
163 caller must have the chainlock before calling this routine. Caller must be
166 static void ctdb_send_dmaster_reply(struct ctdb_db_context
*ctdb_db
,
167 struct ctdb_ltdb_header
*header
,
168 TDB_DATA key
, TDB_DATA data
,
169 uint32_t new_dmaster
,
172 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
173 struct ctdb_reply_dmaster_old
*r
;
177 if (ctdb
->pnn
!= ctdb_lmaster(ctdb
, &key
)) {
178 DEBUG(DEBUG_ALERT
,(__location__
" Caller is not lmaster!\n"));
182 header
->dmaster
= new_dmaster
;
183 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, data
);
185 ctdb_fatal(ctdb
, "ctdb_send_dmaster_reply unable to update dmaster");
189 if (ctdb
->methods
== NULL
) {
190 ctdb_fatal(ctdb
, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
194 /* put the packet on a temporary context, allowing us to safely free
195 it below even if ctdb_reply_dmaster() has freed it already */
196 tmp_ctx
= talloc_new(ctdb
);
198 /* send the CTDB_REPLY_DMASTER */
199 len
= offsetof(struct ctdb_reply_dmaster_old
, data
) + key
.dsize
+ data
.dsize
+ sizeof(uint32_t);
200 r
= ctdb_transport_allocate(ctdb
, tmp_ctx
, CTDB_REPLY_DMASTER
, len
,
201 struct ctdb_reply_dmaster_old
);
202 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
204 r
->hdr
.destnode
= new_dmaster
;
205 r
->hdr
.reqid
= reqid
;
206 r
->hdr
.generation
= ctdb_db
->generation
;
207 r
->rsn
= header
->rsn
;
208 r
->keylen
= key
.dsize
;
209 r
->datalen
= data
.dsize
;
210 r
->db_id
= ctdb_db
->db_id
;
211 memcpy(&r
->data
[0], key
.dptr
, key
.dsize
);
212 memcpy(&r
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
213 memcpy(&r
->data
[key
.dsize
+data
.dsize
], &header
->flags
, sizeof(uint32_t));
215 ctdb_queue_packet(ctdb
, &r
->hdr
);
217 talloc_free(tmp_ctx
);
221 send a dmaster request (give another node the dmaster for a record)
223 This is always sent to the lmaster, which ensures that the lmaster
224 always knows who the dmaster is. The lmaster will then send a
225 CTDB_REPLY_DMASTER to the new dmaster
227 static void ctdb_call_send_dmaster(struct ctdb_db_context
*ctdb_db
,
228 struct ctdb_req_call_old
*c
,
229 struct ctdb_ltdb_header
*header
,
230 TDB_DATA
*key
, TDB_DATA
*data
)
232 struct ctdb_req_dmaster_old
*r
;
233 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
235 uint32_t lmaster
= ctdb_lmaster(ctdb
, key
);
237 if (ctdb
->methods
== NULL
) {
238 ctdb_fatal(ctdb
, "Failed ctdb_call_send_dmaster since transport is down");
242 if (data
->dsize
!= 0) {
243 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
246 if (lmaster
== ctdb
->pnn
) {
247 ctdb_send_dmaster_reply(ctdb_db
, header
, *key
, *data
,
248 c
->hdr
.srcnode
, c
->hdr
.reqid
);
252 len
= offsetof(struct ctdb_req_dmaster_old
, data
) + key
->dsize
+ data
->dsize
254 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_DMASTER
, len
,
255 struct ctdb_req_dmaster_old
);
256 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
257 r
->hdr
.destnode
= lmaster
;
258 r
->hdr
.reqid
= c
->hdr
.reqid
;
259 r
->hdr
.generation
= ctdb_db
->generation
;
261 r
->rsn
= header
->rsn
;
262 r
->dmaster
= c
->hdr
.srcnode
;
263 r
->keylen
= key
->dsize
;
264 r
->datalen
= data
->dsize
;
265 memcpy(&r
->data
[0], key
->dptr
, key
->dsize
);
266 memcpy(&r
->data
[key
->dsize
], data
->dptr
, data
->dsize
);
267 memcpy(&r
->data
[key
->dsize
+ data
->dsize
], &header
->flags
, sizeof(uint32_t));
269 header
->dmaster
= c
->hdr
.srcnode
;
270 if (ctdb_ltdb_store(ctdb_db
, *key
, header
, *data
) != 0) {
271 ctdb_fatal(ctdb
, "Failed to store record in ctdb_call_send_dmaster");
274 ctdb_queue_packet(ctdb
, &r
->hdr
);
279 static void ctdb_sticky_pindown_timeout(struct tevent_context
*ev
,
280 struct tevent_timer
*te
,
281 struct timeval t
, void *private_data
)
283 struct ctdb_sticky_record
*sr
= talloc_get_type(private_data
,
284 struct ctdb_sticky_record
);
286 DEBUG(DEBUG_ERR
,("Pindown timeout db:%s unstick record\n", sr
->ctdb_db
->db_name
));
287 if (sr
->pindown
!= NULL
) {
288 talloc_free(sr
->pindown
);
294 ctdb_set_sticky_pindown(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
296 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
298 struct ctdb_sticky_record
*sr
;
300 k
= ctdb_key_to_idkey(tmp_ctx
, key
);
302 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
303 talloc_free(tmp_ctx
);
307 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
309 talloc_free(tmp_ctx
);
313 talloc_free(tmp_ctx
);
315 if (sr
->pindown
== NULL
) {
316 DEBUG(DEBUG_ERR
,("Pinning down record in %s for %d ms\n", ctdb_db
->db_name
, ctdb
->tunable
.sticky_pindown
));
317 sr
->pindown
= talloc_new(sr
);
318 if (sr
->pindown
== NULL
) {
319 DEBUG(DEBUG_ERR
,("Failed to allocate pindown context for sticky record\n"));
322 tevent_add_timer(ctdb
->ev
, sr
->pindown
,
323 timeval_current_ofs(ctdb
->tunable
.sticky_pindown
/ 1000,
324 (ctdb
->tunable
.sticky_pindown
* 1000) % 1000000),
325 ctdb_sticky_pindown_timeout
, sr
);
332 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
333 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
335 must be called with the chainlock held. This function releases the chainlock
337 static void ctdb_become_dmaster(struct ctdb_db_context
*ctdb_db
,
338 struct ctdb_req_header
*hdr
,
339 TDB_DATA key
, TDB_DATA data
,
340 uint64_t rsn
, uint32_t record_flags
)
342 struct ctdb_call_state
*state
;
343 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
344 struct ctdb_ltdb_header header
;
347 DEBUG(DEBUG_DEBUG
,("pnn %u dmaster response %08x\n", ctdb
->pnn
, ctdb_hash(&key
)));
351 header
.dmaster
= ctdb
->pnn
;
352 header
.flags
= record_flags
;
354 state
= reqid_find(ctdb
->idr
, hdr
->reqid
, struct ctdb_call_state
);
357 if (state
->call
->flags
& CTDB_CALL_FLAG_VACUUM_MIGRATION
) {
359 * We temporarily add the VACUUM_MIGRATED flag to
360 * the record flags, so that ctdb_ltdb_store can
361 * decide whether the record should be stored or
364 header
.flags
|= CTDB_REC_FLAG_VACUUM_MIGRATED
;
368 if (ctdb_ltdb_store(ctdb_db
, key
, &header
, data
) != 0) {
369 ctdb_fatal(ctdb
, "ctdb_reply_dmaster store failed\n");
371 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
373 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
378 /* we just became DMASTER and this database is "sticky",
379 see if the record is flagged as "hot" and set up a pin-down
380 context to stop migrations for a little while if so
382 if (ctdb_db
->sticky
) {
383 ctdb_set_sticky_pindown(ctdb
, ctdb_db
, key
);
387 DEBUG(DEBUG_ERR
,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
388 ctdb
->pnn
, hdr
->reqid
, hdr
->srcnode
));
390 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
392 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
397 if (key
.dsize
!= state
->call
->key
.dsize
|| memcmp(key
.dptr
, state
->call
->key
.dptr
, key
.dsize
)) {
398 DEBUG(DEBUG_ERR
, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr
->reqid
, hdr
->srcnode
));
400 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
402 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
407 if (hdr
->reqid
!= state
->reqid
) {
408 /* we found a record but it was the wrong one */
409 DEBUG(DEBUG_ERR
, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr
->reqid
, hdr
->srcnode
));
411 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
413 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
418 ctdb_call_local(ctdb_db
, state
->call
, &header
, state
, &data
, true);
420 ret
= ctdb_ltdb_unlock(ctdb_db
, state
->call
->key
);
422 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
425 state
->state
= CTDB_CALL_DONE
;
426 if (state
->async
.fn
) {
427 state
->async
.fn(state
);
431 struct dmaster_defer_call
{
432 struct dmaster_defer_call
*next
, *prev
;
433 struct ctdb_context
*ctdb
;
434 struct ctdb_req_header
*hdr
;
437 struct dmaster_defer_queue
{
438 struct ctdb_db_context
*ctdb_db
;
440 struct dmaster_defer_call
*deferred_calls
;
443 static void dmaster_defer_reprocess(struct tevent_context
*ev
,
444 struct tevent_timer
*te
,
448 struct dmaster_defer_call
*call
= talloc_get_type(
449 private_data
, struct dmaster_defer_call
);
451 ctdb_input_pkt(call
->ctdb
, call
->hdr
);
455 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue
*ddq
)
457 /* Ignore requests, if database recovery happens in-between. */
458 if (ddq
->generation
!= ddq
->ctdb_db
->generation
) {
462 while (ddq
->deferred_calls
!= NULL
) {
463 struct dmaster_defer_call
*call
= ddq
->deferred_calls
;
465 DLIST_REMOVE(ddq
->deferred_calls
, call
);
467 talloc_steal(call
->ctdb
, call
);
468 tevent_add_timer(call
->ctdb
->ev
, call
, timeval_zero(),
469 dmaster_defer_reprocess
, call
);
474 static void *insert_ddq_callback(void *parm
, void *data
)
483 * This function is used to reigster a key in database that needs to be updated.
484 * Any requests for that key should get deferred till this is completed.
486 static int dmaster_defer_setup(struct ctdb_db_context
*ctdb_db
,
487 struct ctdb_req_header
*hdr
,
491 struct dmaster_defer_queue
*ddq
;
493 k
= ctdb_key_to_idkey(hdr
, key
);
495 DEBUG(DEBUG_ERR
, ("Failed to allocate key for dmaster defer setup\n"));
500 ddq
= trbt_lookuparray32(ctdb_db
->defer_dmaster
, k
[0], k
);
502 if (ddq
->generation
== ctdb_db
->generation
) {
507 /* Recovery ocurred - get rid of old queue. All the deferred
508 * requests will be resent anyway from ctdb_call_resend_db.
513 ddq
= talloc(hdr
, struct dmaster_defer_queue
);
515 DEBUG(DEBUG_ERR
, ("Failed to allocate dmaster defer queue\n"));
519 ddq
->ctdb_db
= ctdb_db
;
520 ddq
->generation
= hdr
->generation
;
521 ddq
->deferred_calls
= NULL
;
523 trbt_insertarray32_callback(ctdb_db
->defer_dmaster
, k
[0], k
,
524 insert_ddq_callback
, ddq
);
525 talloc_set_destructor(ddq
, dmaster_defer_queue_destructor
);
531 static int dmaster_defer_add(struct ctdb_db_context
*ctdb_db
,
532 struct ctdb_req_header
*hdr
,
535 struct dmaster_defer_queue
*ddq
;
536 struct dmaster_defer_call
*call
;
539 k
= ctdb_key_to_idkey(hdr
, key
);
541 DEBUG(DEBUG_ERR
, ("Failed to allocate key for dmaster defer add\n"));
545 ddq
= trbt_lookuparray32(ctdb_db
->defer_dmaster
, k
[0], k
);
553 if (ddq
->generation
!= hdr
->generation
) {
554 talloc_set_destructor(ddq
, NULL
);
559 call
= talloc(ddq
, struct dmaster_defer_call
);
561 DEBUG(DEBUG_ERR
, ("Failed to allocate dmaster defer call\n"));
565 call
->ctdb
= ctdb_db
->ctdb
;
566 call
->hdr
= talloc_steal(call
, hdr
);
568 DLIST_ADD_END(ddq
->deferred_calls
, call
);
574 called when a CTDB_REQ_DMASTER packet comes in
576 this comes into the lmaster for a record when the current dmaster
577 wants to give up the dmaster role and give it to someone else
579 void ctdb_request_dmaster(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
581 struct ctdb_req_dmaster_old
*c
= (struct ctdb_req_dmaster_old
*)hdr
;
582 TDB_DATA key
, data
, data2
;
583 struct ctdb_ltdb_header header
;
584 struct ctdb_db_context
*ctdb_db
;
585 uint32_t record_flags
= 0;
590 key
.dsize
= c
->keylen
;
591 data
.dptr
= c
->data
+ c
->keylen
;
592 data
.dsize
= c
->datalen
;
593 len
= offsetof(struct ctdb_req_dmaster_old
, data
) + key
.dsize
+ data
.dsize
595 if (len
<= c
->hdr
.length
) {
596 memcpy(&record_flags
, &c
->data
[c
->keylen
+ c
->datalen
],
597 sizeof(record_flags
));
600 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
602 ctdb_send_error(ctdb
, hdr
, -1,
603 "Unknown database in request. db_id==0x%08x",
608 dmaster_defer_setup(ctdb_db
, hdr
, key
);
610 /* fetch the current record */
611 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, key
, &header
, hdr
, &data2
,
612 ctdb_call_input_pkt
, ctdb
, false);
614 ctdb_fatal(ctdb
, "ctdb_req_dmaster failed to fetch record");
618 DEBUG(DEBUG_INFO
,(__location__
" deferring ctdb_request_dmaster\n"));
622 if (ctdb_lmaster(ctdb
, &key
) != ctdb
->pnn
) {
623 DEBUG(DEBUG_ERR
, ("dmaster request to non-lmaster "
624 "db=%s lmaster=%u gen=%u curgen=%u\n",
625 ctdb_db
->db_name
, ctdb_lmaster(ctdb
, &key
),
626 hdr
->generation
, ctdb_db
->generation
));
627 ctdb_fatal(ctdb
, "ctdb_req_dmaster to non-lmaster");
630 DEBUG(DEBUG_DEBUG
,("pnn %u dmaster request on %08x for %u from %u\n",
631 ctdb
->pnn
, ctdb_hash(&key
), c
->dmaster
, c
->hdr
.srcnode
));
633 /* its a protocol error if the sending node is not the current dmaster */
634 if (header
.dmaster
!= hdr
->srcnode
) {
635 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
636 ctdb
->pnn
, c
->dmaster
, hdr
->srcnode
, header
.dmaster
, ctdb_hash(&key
),
637 ctdb_db
->db_id
, hdr
->generation
, ctdb
->vnn_map
->generation
,
638 (unsigned long long)c
->rsn
, (unsigned long long)header
.rsn
, c
->hdr
.reqid
,
639 (key
.dsize
>= 4)?(*(uint32_t *)key
.dptr
):0));
640 if (header
.rsn
!= 0 || header
.dmaster
!= ctdb
->pnn
) {
641 DEBUG(DEBUG_ERR
,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
643 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
644 ctdb_ltdb_unlock(ctdb_db
, key
);
649 if (header
.rsn
> c
->rsn
) {
650 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
651 ctdb
->pnn
, c
->dmaster
, hdr
->srcnode
, header
.dmaster
, ctdb_hash(&key
),
652 ctdb_db
->db_id
, hdr
->generation
, ctdb
->vnn_map
->generation
,
653 (unsigned long long)c
->rsn
, (unsigned long long)header
.rsn
, c
->hdr
.reqid
));
656 /* use the rsn from the sending node */
659 /* store the record flags from the sending node */
660 header
.flags
= record_flags
;
662 /* check if the new dmaster is the lmaster, in which case we
663 skip the dmaster reply */
664 if (c
->dmaster
== ctdb
->pnn
) {
665 ctdb_become_dmaster(ctdb_db
, hdr
, key
, data
, c
->rsn
, record_flags
);
667 ctdb_send_dmaster_reply(ctdb_db
, &header
, key
, data
, c
->dmaster
, hdr
->reqid
);
669 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
671 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
676 static void ctdb_sticky_record_timeout(struct tevent_context
*ev
,
677 struct tevent_timer
*te
,
678 struct timeval t
, void *private_data
)
680 struct ctdb_sticky_record
*sr
= talloc_get_type(private_data
,
681 struct ctdb_sticky_record
);
685 static void *ctdb_make_sticky_record_callback(void *parm
, void *data
)
688 DEBUG(DEBUG_ERR
,("Already have sticky record registered. Free old %p and create new %p\n", data
, parm
));
695 ctdb_make_record_sticky(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
697 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
699 struct ctdb_sticky_record
*sr
;
701 k
= ctdb_key_to_idkey(tmp_ctx
, key
);
703 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
704 talloc_free(tmp_ctx
);
708 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
710 talloc_free(tmp_ctx
);
714 sr
= talloc(ctdb_db
->sticky_records
, struct ctdb_sticky_record
);
716 talloc_free(tmp_ctx
);
717 DEBUG(DEBUG_ERR
,("Failed to allocate sticky record structure\n"));
722 sr
->ctdb_db
= ctdb_db
;
725 DEBUG(DEBUG_ERR
,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
726 ctdb
->tunable
.sticky_duration
,
727 ctdb_db
->db_name
, ctdb_hash(&key
)));
729 trbt_insertarray32_callback(ctdb_db
->sticky_records
, k
[0], &k
[0], ctdb_make_sticky_record_callback
, sr
);
731 tevent_add_timer(ctdb
->ev
, sr
,
732 timeval_current_ofs(ctdb
->tunable
.sticky_duration
, 0),
733 ctdb_sticky_record_timeout
, sr
);
735 talloc_free(tmp_ctx
);
739 struct pinned_down_requeue_handle
{
740 struct ctdb_context
*ctdb
;
741 struct ctdb_req_header
*hdr
;
744 struct pinned_down_deferred_call
{
745 struct ctdb_context
*ctdb
;
746 struct ctdb_req_header
*hdr
;
749 static void pinned_down_requeue(struct tevent_context
*ev
,
750 struct tevent_timer
*te
,
751 struct timeval t
, void *private_data
)
753 struct pinned_down_requeue_handle
*handle
= talloc_get_type(private_data
, struct pinned_down_requeue_handle
);
754 struct ctdb_context
*ctdb
= handle
->ctdb
;
756 talloc_steal(ctdb
, handle
->hdr
);
757 ctdb_call_input_pkt(ctdb
, handle
->hdr
);
762 static int pinned_down_destructor(struct pinned_down_deferred_call
*pinned_down
)
764 struct ctdb_context
*ctdb
= pinned_down
->ctdb
;
765 struct pinned_down_requeue_handle
*handle
= talloc(ctdb
, struct pinned_down_requeue_handle
);
767 handle
->ctdb
= pinned_down
->ctdb
;
768 handle
->hdr
= pinned_down
->hdr
;
769 talloc_steal(handle
, handle
->hdr
);
771 tevent_add_timer(ctdb
->ev
, handle
, timeval_zero(),
772 pinned_down_requeue
, handle
);
778 ctdb_defer_pinned_down_request(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_req_header
*hdr
)
780 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
782 struct ctdb_sticky_record
*sr
;
783 struct pinned_down_deferred_call
*pinned_down
;
785 k
= ctdb_key_to_idkey(tmp_ctx
, key
);
787 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
788 talloc_free(tmp_ctx
);
792 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
794 talloc_free(tmp_ctx
);
798 talloc_free(tmp_ctx
);
800 if (sr
->pindown
== NULL
) {
804 pinned_down
= talloc(sr
->pindown
, struct pinned_down_deferred_call
);
805 if (pinned_down
== NULL
) {
806 DEBUG(DEBUG_ERR
,("Failed to allocate structure for deferred pinned down request\n"));
810 pinned_down
->ctdb
= ctdb
;
811 pinned_down
->hdr
= hdr
;
813 talloc_set_destructor(pinned_down
, pinned_down_destructor
);
814 talloc_steal(pinned_down
, hdr
);
820 ctdb_update_db_stat_hot_keys(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, int hopcount
)
824 /* smallest value is always at index 0 */
825 if (hopcount
<= ctdb_db
->statistics
.hot_keys
[0].count
) {
829 /* see if we already know this key */
830 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
831 if (key
.dsize
!= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
) {
834 if (memcmp(key
.dptr
, ctdb_db
->statistics
.hot_keys
[i
].key
.dptr
, key
.dsize
)) {
837 /* found an entry for this key */
838 if (hopcount
<= ctdb_db
->statistics
.hot_keys
[i
].count
) {
841 ctdb_db
->statistics
.hot_keys
[i
].count
= hopcount
;
845 if (ctdb_db
->statistics
.num_hot_keys
< MAX_HOT_KEYS
) {
846 id
= ctdb_db
->statistics
.num_hot_keys
;
847 ctdb_db
->statistics
.num_hot_keys
++;
852 if (ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
!= NULL
) {
853 talloc_free(ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
);
855 ctdb_db
->statistics
.hot_keys
[id
].key
.dsize
= key
.dsize
;
856 ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
= talloc_memdup(ctdb_db
, key
.dptr
, key
.dsize
);
857 ctdb_db
->statistics
.hot_keys
[id
].count
= hopcount
;
858 DEBUG(DEBUG_NOTICE
,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
859 ctdb_db
->db_name
, ctdb_hash(&key
), id
, hopcount
));
862 for (i
= 1; i
< MAX_HOT_KEYS
; i
++) {
863 if (ctdb_db
->statistics
.hot_keys
[i
].count
== 0) {
866 if (ctdb_db
->statistics
.hot_keys
[i
].count
< ctdb_db
->statistics
.hot_keys
[0].count
) {
867 hopcount
= ctdb_db
->statistics
.hot_keys
[i
].count
;
868 ctdb_db
->statistics
.hot_keys
[i
].count
= ctdb_db
->statistics
.hot_keys
[0].count
;
869 ctdb_db
->statistics
.hot_keys
[0].count
= hopcount
;
871 key
= ctdb_db
->statistics
.hot_keys
[i
].key
;
872 ctdb_db
->statistics
.hot_keys
[i
].key
= ctdb_db
->statistics
.hot_keys
[0].key
;
873 ctdb_db
->statistics
.hot_keys
[0].key
= key
;
879 called when a CTDB_REQ_CALL packet comes in
881 void ctdb_request_call(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
883 struct ctdb_req_call_old
*c
= (struct ctdb_req_call_old
*)hdr
;
885 struct ctdb_reply_call_old
*r
;
887 struct ctdb_ltdb_header header
;
888 struct ctdb_call
*call
;
889 struct ctdb_db_context
*ctdb_db
;
890 int tmp_count
, bucket
;
892 if (ctdb
->methods
== NULL
) {
893 DEBUG(DEBUG_INFO
,(__location__
" Failed ctdb_request_call. Transport is DOWN\n"));
898 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
900 ctdb_send_error(ctdb
, hdr
, -1,
901 "Unknown database in request. db_id==0x%08x",
906 call
= talloc(hdr
, struct ctdb_call
);
907 CTDB_NO_MEMORY_FATAL(ctdb
, call
);
909 call
->call_id
= c
->callid
;
910 call
->key
.dptr
= c
->data
;
911 call
->key
.dsize
= c
->keylen
;
912 call
->call_data
.dptr
= c
->data
+ c
->keylen
;
913 call
->call_data
.dsize
= c
->calldatalen
;
914 call
->reply_data
.dptr
= NULL
;
915 call
->reply_data
.dsize
= 0;
918 /* If this record is pinned down we should defer the
919 request until the pindown times out
921 if (ctdb_db
->sticky
) {
922 if (ctdb_defer_pinned_down_request(ctdb
, ctdb_db
, call
->key
, hdr
) == 0) {
924 ("Defer request for pinned down record in %s\n", ctdb_db
->db_name
));
930 if (dmaster_defer_add(ctdb_db
, hdr
, call
->key
) == 0) {
935 /* determine if we are the dmaster for this key. This also
936 fetches the record data (if any), thus avoiding a 2nd fetch of the data
937 if the call will be answered locally */
939 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, call
->key
, &header
, hdr
, &data
,
940 ctdb_call_input_pkt
, ctdb
, false);
942 ctdb_send_error(ctdb
, hdr
, ret
, "ltdb fetch failed in ctdb_request_call");
947 DEBUG(DEBUG_INFO
,(__location__
" deferred ctdb_request_call\n"));
952 /* Dont do READONLY if we don't have a tracking database */
953 if ((c
->flags
& CTDB_WANT_READONLY
) && !ctdb_db
->readonly
) {
954 c
->flags
&= ~CTDB_WANT_READONLY
;
957 if (header
.flags
& CTDB_REC_RO_REVOKE_COMPLETE
) {
958 header
.flags
&= ~CTDB_REC_RO_FLAGS
;
959 CTDB_INCREMENT_STAT(ctdb
, total_ro_revokes
);
960 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_revokes
);
961 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
962 ctdb_fatal(ctdb
, "Failed to write header with cleared REVOKE flag");
964 /* and clear out the tracking data */
965 if (tdb_delete(ctdb_db
->rottdb
, call
->key
) != 0) {
966 DEBUG(DEBUG_ERR
,(__location__
" Failed to clear out trackingdb record\n"));
970 /* if we are revoking, we must defer all other calls until the revoke
973 if (header
.flags
& CTDB_REC_RO_REVOKING_READONLY
) {
974 talloc_free(data
.dptr
);
975 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
977 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, call
->key
, hdr
, ctdb_call_input_pkt
, ctdb
) != 0) {
978 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
985 * If we are not the dmaster and are not hosting any delegations,
986 * then we redirect the request to the node than can answer it
987 * (the lmaster or the dmaster).
989 if ((header
.dmaster
!= ctdb
->pnn
)
990 && (!(header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
)) ) {
991 talloc_free(data
.dptr
);
992 ctdb_call_send_redirect(ctdb
, ctdb_db
, call
->key
, c
, &header
);
994 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
996 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
1002 if ( (!(c
->flags
& CTDB_WANT_READONLY
))
1003 && (header
.flags
& (CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
)) ) {
1004 header
.flags
|= CTDB_REC_RO_REVOKING_READONLY
;
1005 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
1006 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
1008 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
1010 if (ctdb_start_revoke_ro_record(ctdb
, ctdb_db
, call
->key
, &header
, data
) != 0) {
1011 ctdb_fatal(ctdb
, "Failed to start record revoke");
1013 talloc_free(data
.dptr
);
1015 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, call
->key
, hdr
, ctdb_call_input_pkt
, ctdb
) != 0) {
1016 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
1023 /* If this is the first request for delegation. bump rsn and set
1024 * the delegations flag
1026 if ((c
->flags
& CTDB_WANT_READONLY
)
1027 && (c
->callid
== CTDB_FETCH_WITH_HEADER_FUNC
)
1028 && (!(header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
))) {
1030 header
.flags
|= CTDB_REC_RO_HAVE_DELEGATIONS
;
1031 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
1032 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
1035 if ((c
->flags
& CTDB_WANT_READONLY
)
1036 && (call
->call_id
== CTDB_FETCH_WITH_HEADER_FUNC
)) {
1039 tdata
= tdb_fetch(ctdb_db
->rottdb
, call
->key
);
1040 if (ctdb_trackingdb_add_pnn(ctdb
, &tdata
, c
->hdr
.srcnode
) != 0) {
1041 ctdb_fatal(ctdb
, "Failed to add node to trackingdb");
1043 if (tdb_store(ctdb_db
->rottdb
, call
->key
, tdata
, TDB_REPLACE
) != 0) {
1044 ctdb_fatal(ctdb
, "Failed to store trackingdb data");
1048 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
1050 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
1053 len
= offsetof(struct ctdb_reply_call_old
, data
) + data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
1054 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REPLY_CALL
, len
,
1055 struct ctdb_reply_call_old
);
1056 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
1057 r
->hdr
.destnode
= c
->hdr
.srcnode
;
1058 r
->hdr
.reqid
= c
->hdr
.reqid
;
1059 r
->hdr
.generation
= ctdb_db
->generation
;
1061 r
->datalen
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
1063 header
.flags
|= CTDB_REC_RO_HAVE_READONLY
;
1064 header
.flags
&= ~CTDB_REC_RO_HAVE_DELEGATIONS
;
1065 memcpy(&r
->data
[0], &header
, sizeof(struct ctdb_ltdb_header
));
1068 memcpy(&r
->data
[sizeof(struct ctdb_ltdb_header
)], data
.dptr
, data
.dsize
);
1071 ctdb_queue_packet(ctdb
, &r
->hdr
);
1072 CTDB_INCREMENT_STAT(ctdb
, total_ro_delegations
);
1073 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_delegations
);
1080 CTDB_UPDATE_STAT(ctdb
, max_hop_count
, c
->hopcount
);
1081 tmp_count
= c
->hopcount
;
1087 if (bucket
>= MAX_COUNT_BUCKETS
) {
1088 bucket
= MAX_COUNT_BUCKETS
- 1;
1090 CTDB_INCREMENT_STAT(ctdb
, hop_count_bucket
[bucket
]);
1091 CTDB_INCREMENT_DB_STAT(ctdb_db
, hop_count_bucket
[bucket
]);
1092 ctdb_update_db_stat_hot_keys(ctdb_db
, call
->key
, c
->hopcount
);
1094 /* If this database supports sticky records, then check if the
1095 hopcount is big. If it is it means the record is hot and we
1096 should make it sticky.
1098 if (ctdb_db
->sticky
&& c
->hopcount
>= ctdb
->tunable
.hopcount_make_sticky
) {
1099 ctdb_make_record_sticky(ctdb
, ctdb_db
, call
->key
);
1103 /* Try if possible to migrate the record off to the caller node.
1104 * From the clients perspective a fetch of the data is just as
1105 * expensive as a migration.
1107 if (c
->hdr
.srcnode
!= ctdb
->pnn
) {
1108 if (ctdb_db
->persistent_state
) {
1109 DEBUG(DEBUG_INFO
, (__location__
" refusing migration"
1110 " of key %s while transaction is active\n",
1111 (char *)call
->key
.dptr
));
1113 DEBUG(DEBUG_DEBUG
,("pnn %u starting migration of %08x to %u\n",
1114 ctdb
->pnn
, ctdb_hash(&(call
->key
)), c
->hdr
.srcnode
));
1115 ctdb_call_send_dmaster(ctdb_db
, c
, &header
, &(call
->key
), &data
);
1116 talloc_free(data
.dptr
);
1118 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
1120 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
1127 ret
= ctdb_call_local(ctdb_db
, call
, &header
, hdr
, &data
, true);
1129 DEBUG(DEBUG_ERR
,(__location__
" ctdb_call_local failed\n"));
1133 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
1135 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
1138 len
= offsetof(struct ctdb_reply_call_old
, data
) + call
->reply_data
.dsize
;
1139 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REPLY_CALL
, len
,
1140 struct ctdb_reply_call_old
);
1141 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
1142 r
->hdr
.destnode
= hdr
->srcnode
;
1143 r
->hdr
.reqid
= hdr
->reqid
;
1144 r
->hdr
.generation
= ctdb_db
->generation
;
1145 r
->status
= call
->status
;
1146 r
->datalen
= call
->reply_data
.dsize
;
1147 if (call
->reply_data
.dsize
) {
1148 memcpy(&r
->data
[0], call
->reply_data
.dptr
, call
->reply_data
.dsize
);
1151 ctdb_queue_packet(ctdb
, &r
->hdr
);
1158 * called when a CTDB_REPLY_CALL packet comes in
1160 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1161 * contains any reply data from the call
1163 void ctdb_reply_call(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1165 struct ctdb_reply_call_old
*c
= (struct ctdb_reply_call_old
*)hdr
;
1166 struct ctdb_call_state
*state
;
1168 state
= reqid_find(ctdb
->idr
, hdr
->reqid
, struct ctdb_call_state
);
1169 if (state
== NULL
) {
1170 DEBUG(DEBUG_ERR
, (__location__
" reqid %u not found\n", hdr
->reqid
));
1174 if (hdr
->reqid
!= state
->reqid
) {
1175 /* we found a record but it was the wrong one */
1176 DEBUG(DEBUG_ERR
, ("Dropped orphaned call reply with reqid:%u\n",hdr
->reqid
));
1181 /* read only delegation processing */
1182 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1183 * delegation since we may need to update the record header
1185 if (state
->c
->callid
== CTDB_FETCH_WITH_HEADER_FUNC
) {
1186 struct ctdb_db_context
*ctdb_db
= state
->ctdb_db
;
1187 struct ctdb_ltdb_header
*header
= (struct ctdb_ltdb_header
*)&c
->data
[0];
1188 struct ctdb_ltdb_header oldheader
;
1189 TDB_DATA key
, data
, olddata
;
1192 if (!(header
->flags
& CTDB_REC_RO_HAVE_READONLY
)) {
1197 key
.dsize
= state
->c
->keylen
;
1198 key
.dptr
= state
->c
->data
;
1199 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
,
1200 ctdb_call_input_pkt
, ctdb
, false);
1205 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock in ctdb_reply_call\n"));
1209 ret
= ctdb_ltdb_fetch(ctdb_db
, key
, &oldheader
, state
, &olddata
);
1211 DEBUG(DEBUG_ERR
, ("Failed to fetch old record in ctdb_reply_call\n"));
1212 ctdb_ltdb_unlock(ctdb_db
, key
);
1216 if (header
->rsn
<= oldheader
.rsn
) {
1217 ctdb_ltdb_unlock(ctdb_db
, key
);
1221 if (c
->datalen
< sizeof(struct ctdb_ltdb_header
)) {
1222 DEBUG(DEBUG_ERR
,(__location__
" Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c
->datalen
));
1223 ctdb_ltdb_unlock(ctdb_db
, key
);
1227 data
.dsize
= c
->datalen
- sizeof(struct ctdb_ltdb_header
);
1228 data
.dptr
= &c
->data
[sizeof(struct ctdb_ltdb_header
)];
1229 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, data
);
1231 DEBUG(DEBUG_ERR
, ("Failed to store new record in ctdb_reply_call\n"));
1232 ctdb_ltdb_unlock(ctdb_db
, key
);
1236 ctdb_ltdb_unlock(ctdb_db
, key
);
1240 state
->call
->reply_data
.dptr
= c
->data
;
1241 state
->call
->reply_data
.dsize
= c
->datalen
;
1242 state
->call
->status
= c
->status
;
1244 talloc_steal(state
, c
);
1246 state
->state
= CTDB_CALL_DONE
;
1247 if (state
->async
.fn
) {
1248 state
->async
.fn(state
);
1254 * called when a CTDB_REPLY_DMASTER packet comes in
1256 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1257 * request packet. It means that the current dmaster wants to give us
1260 void ctdb_reply_dmaster(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1262 struct ctdb_reply_dmaster_old
*c
= (struct ctdb_reply_dmaster_old
*)hdr
;
1263 struct ctdb_db_context
*ctdb_db
;
1265 uint32_t record_flags
= 0;
1269 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
1270 if (ctdb_db
== NULL
) {
1271 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c
->db_id
));
1276 key
.dsize
= c
->keylen
;
1277 data
.dptr
= &c
->data
[key
.dsize
];
1278 data
.dsize
= c
->datalen
;
1279 len
= offsetof(struct ctdb_reply_dmaster_old
, data
) + key
.dsize
+ data
.dsize
1281 if (len
<= c
->hdr
.length
) {
1282 memcpy(&record_flags
, &c
->data
[c
->keylen
+ c
->datalen
],
1283 sizeof(record_flags
));
1286 dmaster_defer_setup(ctdb_db
, hdr
, key
);
1288 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
,
1289 ctdb_call_input_pkt
, ctdb
, false);
1294 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock in ctdb_reply_dmaster\n"));
1298 ctdb_become_dmaster(ctdb_db
, hdr
, key
, data
, c
->rsn
, record_flags
);
1303 called when a CTDB_REPLY_ERROR packet comes in
1305 void ctdb_reply_error(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1307 struct ctdb_reply_error_old
*c
= (struct ctdb_reply_error_old
*)hdr
;
1308 struct ctdb_call_state
*state
;
1310 state
= reqid_find(ctdb
->idr
, hdr
->reqid
, struct ctdb_call_state
);
1311 if (state
== NULL
) {
1312 DEBUG(DEBUG_ERR
,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1313 ctdb
->pnn
, hdr
->reqid
));
1317 if (hdr
->reqid
!= state
->reqid
) {
1318 /* we found a record but it was the wrong one */
1319 DEBUG(DEBUG_ERR
, ("Dropped orphaned error reply with reqid:%u\n",hdr
->reqid
));
1323 talloc_steal(state
, c
);
1325 state
->state
= CTDB_CALL_ERROR
;
1326 state
->errmsg
= (char *)c
->msg
;
1327 if (state
->async
.fn
) {
1328 state
->async
.fn(state
);
1336 static int ctdb_call_destructor(struct ctdb_call_state
*state
)
1338 DLIST_REMOVE(state
->ctdb_db
->pending_calls
, state
);
1339 reqid_remove(state
->ctdb_db
->ctdb
->idr
, state
->reqid
);
1345 called when a ctdb_call needs to be resent after a reconfigure event
1347 static void ctdb_call_resend(struct ctdb_call_state
*state
)
1349 struct ctdb_context
*ctdb
= state
->ctdb_db
->ctdb
;
1351 state
->generation
= state
->ctdb_db
->generation
;
1353 /* use a new reqid, in case the old reply does eventually come in */
1354 reqid_remove(ctdb
->idr
, state
->reqid
);
1355 state
->reqid
= reqid_new(ctdb
->idr
, state
);
1356 state
->c
->hdr
.reqid
= state
->reqid
;
1358 /* update the generation count for this request, so its valid with the new vnn_map */
1359 state
->c
->hdr
.generation
= state
->generation
;
1361 /* send the packet to ourselves, it will be redirected appropriately */
1362 state
->c
->hdr
.destnode
= ctdb
->pnn
;
1364 ctdb_queue_packet(ctdb
, &state
->c
->hdr
);
1365 DEBUG(DEBUG_NOTICE
,("resent ctdb_call for db %s reqid %u generation %u\n",
1366 state
->ctdb_db
->db_name
, state
->reqid
, state
->generation
));
1370 resend all pending calls on recovery
1372 void ctdb_call_resend_db(struct ctdb_db_context
*ctdb_db
)
1374 struct ctdb_call_state
*state
, *next
;
1376 for (state
= ctdb_db
->pending_calls
; state
; state
= next
) {
1378 ctdb_call_resend(state
);
1382 void ctdb_call_resend_all(struct ctdb_context
*ctdb
)
1384 struct ctdb_db_context
*ctdb_db
;
1386 for (ctdb_db
= ctdb
->db_list
; ctdb_db
; ctdb_db
= ctdb_db
->next
) {
1387 ctdb_call_resend_db(ctdb_db
);
1392 this allows the caller to setup a async.fn
1394 static void call_local_trigger(struct tevent_context
*ev
,
1395 struct tevent_timer
*te
,
1396 struct timeval t
, void *private_data
)
1398 struct ctdb_call_state
*state
= talloc_get_type(private_data
, struct ctdb_call_state
);
1399 if (state
->async
.fn
) {
1400 state
->async
.fn(state
);
1406 construct an event driven local ctdb_call
1408 this is used so that locally processed ctdb_call requests are processed
1409 in an event driven manner
1411 struct ctdb_call_state
*ctdb_call_local_send(struct ctdb_db_context
*ctdb_db
,
1412 struct ctdb_call
*call
,
1413 struct ctdb_ltdb_header
*header
,
1416 struct ctdb_call_state
*state
;
1417 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1420 state
= talloc_zero(ctdb_db
, struct ctdb_call_state
);
1421 CTDB_NO_MEMORY_NULL(ctdb
, state
);
1423 talloc_steal(state
, data
->dptr
);
1425 state
->state
= CTDB_CALL_DONE
;
1426 state
->call
= talloc(state
, struct ctdb_call
);
1427 CTDB_NO_MEMORY_NULL(ctdb
, state
->call
);
1428 *(state
->call
) = *call
;
1429 state
->ctdb_db
= ctdb_db
;
1431 ret
= ctdb_call_local(ctdb_db
, state
->call
, header
, state
, data
, true);
1433 DEBUG(DEBUG_DEBUG
,("ctdb_call_local() failed, ignoring return code %d\n", ret
));
1436 tevent_add_timer(ctdb
->ev
, state
, timeval_zero(),
1437 call_local_trigger
, state
);
1444 make a remote ctdb call - async send. Called in daemon context.
1446 This constructs a ctdb_call request and queues it for processing.
1447 This call never blocks.
1449 struct ctdb_call_state
*ctdb_daemon_call_send_remote(struct ctdb_db_context
*ctdb_db
,
1450 struct ctdb_call
*call
,
1451 struct ctdb_ltdb_header
*header
)
1454 struct ctdb_call_state
*state
;
1455 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1457 if (ctdb
->methods
== NULL
) {
1458 DEBUG(DEBUG_INFO
,(__location__
" Failed send packet. Transport is down\n"));
1462 state
= talloc_zero(ctdb_db
, struct ctdb_call_state
);
1463 CTDB_NO_MEMORY_NULL(ctdb
, state
);
1464 state
->call
= talloc(state
, struct ctdb_call
);
1465 CTDB_NO_MEMORY_NULL(ctdb
, state
->call
);
1467 state
->reqid
= reqid_new(ctdb
->idr
, state
);
1468 state
->ctdb_db
= ctdb_db
;
1469 talloc_set_destructor(state
, ctdb_call_destructor
);
1471 len
= offsetof(struct ctdb_req_call_old
, data
) + call
->key
.dsize
+ call
->call_data
.dsize
;
1472 state
->c
= ctdb_transport_allocate(ctdb
, state
, CTDB_REQ_CALL
, len
,
1473 struct ctdb_req_call_old
);
1474 CTDB_NO_MEMORY_NULL(ctdb
, state
->c
);
1475 state
->c
->hdr
.destnode
= header
->dmaster
;
1477 /* this limits us to 16k outstanding messages - not unreasonable */
1478 state
->c
->hdr
.reqid
= state
->reqid
;
1479 state
->c
->hdr
.generation
= ctdb_db
->generation
;
1480 state
->c
->flags
= call
->flags
;
1481 state
->c
->db_id
= ctdb_db
->db_id
;
1482 state
->c
->callid
= call
->call_id
;
1483 state
->c
->hopcount
= 0;
1484 state
->c
->keylen
= call
->key
.dsize
;
1485 state
->c
->calldatalen
= call
->call_data
.dsize
;
1486 memcpy(&state
->c
->data
[0], call
->key
.dptr
, call
->key
.dsize
);
1487 memcpy(&state
->c
->data
[call
->key
.dsize
],
1488 call
->call_data
.dptr
, call
->call_data
.dsize
);
1489 *(state
->call
) = *call
;
1490 state
->call
->call_data
.dptr
= &state
->c
->data
[call
->key
.dsize
];
1491 state
->call
->key
.dptr
= &state
->c
->data
[0];
1493 state
->state
= CTDB_CALL_WAIT
;
1494 state
->generation
= ctdb_db
->generation
;
1496 DLIST_ADD(ctdb_db
->pending_calls
, state
);
1498 ctdb_queue_packet(ctdb
, &state
->c
->hdr
);
1504 make a remote ctdb call - async recv - called in daemon context
1506 This is called when the program wants to wait for a ctdb_call to complete and get the
1507 results. This call will block unless the call has already completed.
1509 int ctdb_daemon_call_recv(struct ctdb_call_state
*state
, struct ctdb_call
*call
)
1511 while (state
->state
< CTDB_CALL_DONE
) {
1512 tevent_loop_once(state
->ctdb_db
->ctdb
->ev
);
1514 if (state
->state
!= CTDB_CALL_DONE
) {
1515 ctdb_set_error(state
->ctdb_db
->ctdb
, "%s", state
->errmsg
);
1520 if (state
->call
->reply_data
.dsize
) {
1521 call
->reply_data
.dptr
= talloc_memdup(call
,
1522 state
->call
->reply_data
.dptr
,
1523 state
->call
->reply_data
.dsize
);
1524 call
->reply_data
.dsize
= state
->call
->reply_data
.dsize
;
1526 call
->reply_data
.dptr
= NULL
;
1527 call
->reply_data
.dsize
= 0;
1529 call
->status
= state
->call
->status
;
1536 send a keepalive packet to the other node
1538 void ctdb_send_keepalive(struct ctdb_context
*ctdb
, uint32_t destnode
)
1540 struct ctdb_req_keepalive_old
*r
;
1542 if (ctdb
->methods
== NULL
) {
1543 DEBUG(DEBUG_INFO
,(__location__
" Failed to send keepalive. Transport is DOWN\n"));
1547 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_KEEPALIVE
,
1548 sizeof(struct ctdb_req_keepalive_old
),
1549 struct ctdb_req_keepalive_old
);
1550 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
1551 r
->hdr
.destnode
= destnode
;
1554 CTDB_INCREMENT_STAT(ctdb
, keepalive_packets_sent
);
1556 ctdb_queue_packet(ctdb
, &r
->hdr
);
1563 struct revokechild_deferred_call
{
1564 struct ctdb_context
*ctdb
;
1565 struct ctdb_req_header
*hdr
;
1566 deferred_requeue_fn fn
;
1570 struct revokechild_handle
{
1571 struct revokechild_handle
*next
, *prev
;
1572 struct ctdb_context
*ctdb
;
1573 struct ctdb_db_context
*ctdb_db
;
1574 struct tevent_fd
*fde
;
1581 struct revokechild_requeue_handle
{
1582 struct ctdb_context
*ctdb
;
1583 struct ctdb_req_header
*hdr
;
1584 deferred_requeue_fn fn
;
1588 static void deferred_call_requeue(struct tevent_context
*ev
,
1589 struct tevent_timer
*te
,
1590 struct timeval t
, void *private_data
)
1592 struct revokechild_requeue_handle
*requeue_handle
= talloc_get_type(private_data
, struct revokechild_requeue_handle
);
1594 requeue_handle
->fn(requeue_handle
->ctx
, requeue_handle
->hdr
);
1595 talloc_free(requeue_handle
);
1598 static int deferred_call_destructor(struct revokechild_deferred_call
*deferred_call
)
1600 struct ctdb_context
*ctdb
= deferred_call
->ctdb
;
1601 struct revokechild_requeue_handle
*requeue_handle
= talloc(ctdb
, struct revokechild_requeue_handle
);
1602 struct ctdb_req_call_old
*c
= (struct ctdb_req_call_old
*)deferred_call
->hdr
;
1604 requeue_handle
->ctdb
= ctdb
;
1605 requeue_handle
->hdr
= deferred_call
->hdr
;
1606 requeue_handle
->fn
= deferred_call
->fn
;
1607 requeue_handle
->ctx
= deferred_call
->ctx
;
1608 talloc_steal(requeue_handle
, requeue_handle
->hdr
);
1610 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1611 tevent_add_timer(ctdb
->ev
, requeue_handle
,
1612 timeval_current_ofs(c
->flags
& CTDB_WANT_READONLY
? 1 : 0, 0),
1613 deferred_call_requeue
, requeue_handle
);
1619 static int revokechild_destructor(struct revokechild_handle
*rc
)
1621 if (rc
->fde
!= NULL
) {
1622 talloc_free(rc
->fde
);
1625 if (rc
->fd
[0] != -1) {
1628 if (rc
->fd
[1] != -1) {
1631 ctdb_kill(rc
->ctdb
, rc
->child
, SIGKILL
);
1633 DLIST_REMOVE(rc
->ctdb_db
->revokechild_active
, rc
);
1637 static void revokechild_handler(struct tevent_context
*ev
,
1638 struct tevent_fd
*fde
,
1639 uint16_t flags
, void *private_data
)
1641 struct revokechild_handle
*rc
= talloc_get_type(private_data
,
1642 struct revokechild_handle
);
1646 ret
= sys_read(rc
->fd
[0], &c
, 1);
1648 DEBUG(DEBUG_ERR
,("Failed to read status from revokechild. errno:%d\n", errno
));
1654 DEBUG(DEBUG_ERR
,("revokechild returned failure. status:%d\n", c
));
1663 struct ctdb_revoke_state
{
1664 struct ctdb_db_context
*ctdb_db
;
1666 struct ctdb_ltdb_header
*header
;
1673 static void update_record_cb(struct ctdb_client_control_state
*state
)
1675 struct ctdb_revoke_state
*revoke_state
;
1679 if (state
== NULL
) {
1682 revoke_state
= state
->async
.private_data
;
1684 state
->async
.fn
= NULL
;
1685 ret
= ctdb_control_recv(state
->ctdb
, state
, state
, NULL
, &res
, NULL
);
1686 if ((ret
!= 0) || (res
!= 0)) {
1687 DEBUG(DEBUG_ERR
,("Recv for revoke update record failed ret:%d res:%d\n", ret
, res
));
1688 revoke_state
->status
= -1;
1691 revoke_state
->count
--;
1692 if (revoke_state
->count
<= 0) {
1693 revoke_state
->finished
= 1;
1697 static void revoke_send_cb(struct ctdb_context
*ctdb
, uint32_t pnn
, void *private_data
)
1699 struct ctdb_revoke_state
*revoke_state
= private_data
;
1700 struct ctdb_client_control_state
*state
;
1702 state
= ctdb_ctrl_updaterecord_send(ctdb
, revoke_state
, timeval_current_ofs(ctdb
->tunable
.control_timeout
,0), pnn
, revoke_state
->ctdb_db
, revoke_state
->key
, revoke_state
->header
, revoke_state
->data
);
1703 if (state
== NULL
) {
1704 DEBUG(DEBUG_ERR
,("Failure to send update record to revoke readonly delegation\n"));
1705 revoke_state
->status
= -1;
1708 state
->async
.fn
= update_record_cb
;
1709 state
->async
.private_data
= revoke_state
;
1711 revoke_state
->count
++;
1715 static void ctdb_revoke_timeout_handler(struct tevent_context
*ev
,
1716 struct tevent_timer
*te
,
1717 struct timeval yt
, void *private_data
)
1719 struct ctdb_revoke_state
*state
= private_data
;
1721 DEBUG(DEBUG_ERR
,("Timed out waiting for revoke to finish\n"));
1722 state
->finished
= 1;
1726 static int ctdb_revoke_all_delegations(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA tdata
, TDB_DATA key
, struct ctdb_ltdb_header
*header
, TDB_DATA data
)
1728 struct ctdb_revoke_state
*state
= talloc_zero(ctdb
, struct ctdb_revoke_state
);
1729 struct ctdb_ltdb_header new_header
;
1732 state
->ctdb_db
= ctdb_db
;
1734 state
->header
= header
;
1737 ctdb_trackingdb_traverse(ctdb
, tdata
, revoke_send_cb
, state
);
1739 tevent_add_timer(ctdb
->ev
, state
,
1740 timeval_current_ofs(ctdb
->tunable
.control_timeout
, 0),
1741 ctdb_revoke_timeout_handler
, state
);
1743 while (state
->finished
== 0) {
1744 tevent_loop_once(ctdb
->ev
);
1747 if (ctdb_ltdb_lock(ctdb_db
, key
) != 0) {
1748 DEBUG(DEBUG_ERR
,("Failed to chainlock the database in revokechild\n"));
1752 if (ctdb_ltdb_fetch(ctdb_db
, key
, &new_header
, state
, &new_data
) != 0) {
1753 ctdb_ltdb_unlock(ctdb_db
, key
);
1754 DEBUG(DEBUG_ERR
,("Failed for fetch tdb record in revokechild\n"));
1759 if (new_header
.rsn
> header
->rsn
) {
1760 ctdb_ltdb_unlock(ctdb_db
, key
);
1761 DEBUG(DEBUG_ERR
,("RSN too high in tdb record in revokechild\n"));
1765 if ( (new_header
.flags
& (CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
)) != (CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
) ) {
1766 ctdb_ltdb_unlock(ctdb_db
, key
);
1767 DEBUG(DEBUG_ERR
,("Flags are wrong in tdb record in revokechild\n"));
1773 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1774 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1776 if (state
->status
== 0) {
1778 new_header
.flags
|= CTDB_REC_RO_REVOKE_COMPLETE
;
1780 DEBUG(DEBUG_NOTICE
, ("Revoke all delegations failed, retrying.\n"));
1781 new_header
.flags
&= ~CTDB_REC_RO_REVOKING_READONLY
;
1783 if (ctdb_ltdb_store(ctdb_db
, key
, &new_header
, new_data
) != 0) {
1784 ctdb_ltdb_unlock(ctdb_db
, key
);
1785 DEBUG(DEBUG_ERR
,("Failed to write new record in revokechild\n"));
1789 ctdb_ltdb_unlock(ctdb_db
, key
);
1796 int ctdb_start_revoke_ro_record(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_ltdb_header
*header
, TDB_DATA data
)
1799 struct revokechild_handle
*rc
;
1800 pid_t parent
= getpid();
1803 header
->flags
&= ~(CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
);
1804 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
1807 if ((rc
= talloc_zero(ctdb_db
, struct revokechild_handle
)) == NULL
) {
1808 DEBUG(DEBUG_ERR
,("Failed to allocate revokechild_handle\n"));
1812 tdata
= tdb_fetch(ctdb_db
->rottdb
, key
);
1813 if (tdata
.dsize
> 0) {
1817 tdata
.dptr
= talloc_memdup(rc
, tdata
.dptr
, tdata
.dsize
);
1823 rc
->ctdb_db
= ctdb_db
;
1827 talloc_set_destructor(rc
, revokechild_destructor
);
1829 rc
->key
.dsize
= key
.dsize
;
1830 rc
->key
.dptr
= talloc_memdup(rc
, key
.dptr
, key
.dsize
);
1831 if (rc
->key
.dptr
== NULL
) {
1832 DEBUG(DEBUG_ERR
,("Failed to allocate key for revokechild_handle\n"));
1839 DEBUG(DEBUG_ERR
,("Failed to allocate key for revokechild_handle\n"));
1845 rc
->child
= ctdb_fork(ctdb
);
1846 if (rc
->child
== (pid_t
)-1) {
1847 DEBUG(DEBUG_ERR
,("Failed to fork child for revokechild\n"));
1852 if (rc
->child
== 0) {
1855 debug_extra
= talloc_asprintf(NULL
, "revokechild-%s:", ctdb_db
->db_name
);
1857 prctl_set_comment("ctdb_revokechild");
1858 if (switch_from_server_to_client(ctdb
, "revokechild-%s", ctdb_db
->db_name
) != 0) {
1859 DEBUG(DEBUG_ERR
,("Failed to switch from server to client for revokechild process\n"));
1861 goto child_finished
;
1864 c
= ctdb_revoke_all_delegations(ctdb
, ctdb_db
, tdata
, key
, header
, data
);
1867 sys_write(rc
->fd
[1], &c
, 1);
1868 ctdb_wait_for_process_to_exit(parent
);
1874 set_close_on_exec(rc
->fd
[0]);
1876 /* This is an active revokechild child process */
1877 DLIST_ADD_END(ctdb_db
->revokechild_active
, rc
);
1879 rc
->fde
= tevent_add_fd(ctdb
->ev
, rc
, rc
->fd
[0], TEVENT_FD_READ
,
1880 revokechild_handler
, (void *)rc
);
1881 if (rc
->fde
== NULL
) {
1882 DEBUG(DEBUG_ERR
,("Failed to set up fd event for revokechild process\n"));
1885 tevent_fd_set_auto_close(rc
->fde
);
1890 int ctdb_add_revoke_deferred_call(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_req_header
*hdr
, deferred_requeue_fn fn
, void *call_context
)
1892 struct revokechild_handle
*rc
;
1893 struct revokechild_deferred_call
*deferred_call
;
1895 for (rc
= ctdb_db
->revokechild_active
; rc
; rc
= rc
->next
) {
1896 if (rc
->key
.dsize
== 0) {
1899 if (rc
->key
.dsize
!= key
.dsize
) {
1902 if (!memcmp(rc
->key
.dptr
, key
.dptr
, key
.dsize
)) {
1908 DEBUG(DEBUG_ERR
,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1912 deferred_call
= talloc(rc
, struct revokechild_deferred_call
);
1913 if (deferred_call
== NULL
) {
1914 DEBUG(DEBUG_ERR
,("Failed to allocate deferred call structure for revoking record\n"));
1918 deferred_call
->ctdb
= ctdb
;
1919 deferred_call
->hdr
= hdr
;
1920 deferred_call
->fn
= fn
;
1921 deferred_call
->ctx
= call_context
;
1923 talloc_set_destructor(deferred_call
, deferred_call_destructor
);
1924 talloc_steal(deferred_call
, hdr
);