2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record
{
32 struct ctdb_context
*ctdb
;
33 struct ctdb_db_context
*ctdb_db
;
38 find the ctdb_db from a db index
40 struct ctdb_db_context
*find_ctdb_db(struct ctdb_context
*ctdb
, uint32_t id
)
42 struct ctdb_db_context
*ctdb_db
;
44 for (ctdb_db
=ctdb
->db_list
; ctdb_db
; ctdb_db
=ctdb_db
->next
) {
45 if (ctdb_db
->db_id
== id
) {
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p
, struct ctdb_req_header
*hdr
)
57 struct ctdb_context
*ctdb
= talloc_get_type(p
, struct ctdb_context
);
58 ctdb_input_pkt(ctdb
, hdr
);
65 static void ctdb_send_error(struct ctdb_context
*ctdb
,
66 struct ctdb_req_header
*hdr
, uint32_t status
,
67 const char *fmt
, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context
*ctdb
,
69 struct ctdb_req_header
*hdr
, uint32_t status
,
73 struct ctdb_reply_error
*r
;
77 if (ctdb
->methods
== NULL
) {
78 DEBUG(DEBUG_INFO
,(__location__
" Failed to send error. Transport is DOWN\n"));
83 msg
= talloc_vasprintf(ctdb
, fmt
, ap
);
85 ctdb_fatal(ctdb
, "Unable to allocate error in ctdb_send_error\n");
89 msglen
= strlen(msg
)+1;
90 len
= offsetof(struct ctdb_reply_error
, msg
);
91 r
= ctdb_transport_allocate(ctdb
, msg
, CTDB_REPLY_ERROR
, len
+ msglen
,
92 struct ctdb_reply_error
);
93 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
95 r
->hdr
.destnode
= hdr
->srcnode
;
96 r
->hdr
.reqid
= hdr
->reqid
;
99 memcpy(&r
->msg
[0], msg
, msglen
);
101 ctdb_queue_packet(ctdb
, &r
->hdr
);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context
*ctdb
,
121 struct ctdb_db_context
*ctdb_db
,
123 struct ctdb_req_call
*c
,
124 struct ctdb_ltdb_header
*header
)
126 uint32_t lmaster
= ctdb_lmaster(ctdb
, &key
);
128 c
->hdr
.destnode
= lmaster
;
129 if (ctdb
->pnn
== lmaster
) {
130 c
->hdr
.destnode
= header
->dmaster
;
134 if (c
->hopcount
%100 > 95) {
135 DEBUG(DEBUG_WARNING
,("High hopcount %d dbid:%s "
136 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c
->hopcount
, ctdb_db
->db_name
, ctdb_hash(&key
),
139 c
->hdr
.reqid
, ctdb
->pnn
, c
->hdr
.srcnode
, lmaster
,
140 header
->dmaster
, c
->hdr
.destnode
));
143 ctdb_queue_packet(ctdb
, &c
->hdr
);
150 caller must have the chainlock before calling this routine. Caller must be
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context
*ctdb_db
,
154 struct ctdb_ltdb_header
*header
,
155 TDB_DATA key
, TDB_DATA data
,
156 uint32_t new_dmaster
,
159 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
160 struct ctdb_reply_dmaster
*r
;
164 if (ctdb
->pnn
!= ctdb_lmaster(ctdb
, &key
)) {
165 DEBUG(DEBUG_ALERT
,(__location__
" Caller is not lmaster!\n"));
169 header
->dmaster
= new_dmaster
;
170 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, data
);
172 ctdb_fatal(ctdb
, "ctdb_send_dmaster_reply unable to update dmaster");
176 if (ctdb
->methods
== NULL
) {
177 ctdb_fatal(ctdb
, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx
= talloc_new(ctdb
);
185 /* send the CTDB_REPLY_DMASTER */
186 len
= offsetof(struct ctdb_reply_dmaster
, data
) + key
.dsize
+ data
.dsize
+ sizeof(uint32_t);
187 r
= ctdb_transport_allocate(ctdb
, tmp_ctx
, CTDB_REPLY_DMASTER
, len
,
188 struct ctdb_reply_dmaster
);
189 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
191 r
->hdr
.destnode
= new_dmaster
;
192 r
->hdr
.reqid
= reqid
;
193 r
->rsn
= header
->rsn
;
194 r
->keylen
= key
.dsize
;
195 r
->datalen
= data
.dsize
;
196 r
->db_id
= ctdb_db
->db_id
;
197 memcpy(&r
->data
[0], key
.dptr
, key
.dsize
);
198 memcpy(&r
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
199 memcpy(&r
->data
[key
.dsize
+data
.dsize
], &header
->flags
, sizeof(uint32_t));
201 ctdb_queue_packet(ctdb
, &r
->hdr
);
203 talloc_free(tmp_ctx
);
207 send a dmaster request (give another node the dmaster for a record)
209 This is always sent to the lmaster, which ensures that the lmaster
210 always knows who the dmaster is. The lmaster will then send a
211 CTDB_REPLY_DMASTER to the new dmaster
213 static void ctdb_call_send_dmaster(struct ctdb_db_context
*ctdb_db
,
214 struct ctdb_req_call
*c
,
215 struct ctdb_ltdb_header
*header
,
216 TDB_DATA
*key
, TDB_DATA
*data
)
218 struct ctdb_req_dmaster
*r
;
219 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
221 uint32_t lmaster
= ctdb_lmaster(ctdb
, key
);
223 if (ctdb
->methods
== NULL
) {
224 ctdb_fatal(ctdb
, "Failed ctdb_call_send_dmaster since transport is down");
228 if (data
->dsize
!= 0) {
229 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
232 if (lmaster
== ctdb
->pnn
) {
233 ctdb_send_dmaster_reply(ctdb_db
, header
, *key
, *data
,
234 c
->hdr
.srcnode
, c
->hdr
.reqid
);
238 len
= offsetof(struct ctdb_req_dmaster
, data
) + key
->dsize
+ data
->dsize
240 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_DMASTER
, len
,
241 struct ctdb_req_dmaster
);
242 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
243 r
->hdr
.destnode
= lmaster
;
244 r
->hdr
.reqid
= c
->hdr
.reqid
;
246 r
->rsn
= header
->rsn
;
247 r
->dmaster
= c
->hdr
.srcnode
;
248 r
->keylen
= key
->dsize
;
249 r
->datalen
= data
->dsize
;
250 memcpy(&r
->data
[0], key
->dptr
, key
->dsize
);
251 memcpy(&r
->data
[key
->dsize
], data
->dptr
, data
->dsize
);
252 memcpy(&r
->data
[key
->dsize
+ data
->dsize
], &header
->flags
, sizeof(uint32_t));
254 header
->dmaster
= c
->hdr
.srcnode
;
255 if (ctdb_ltdb_store(ctdb_db
, *key
, header
, *data
) != 0) {
256 ctdb_fatal(ctdb
, "Failed to store record in ctdb_call_send_dmaster");
259 ctdb_queue_packet(ctdb
, &r
->hdr
);
264 static void ctdb_sticky_pindown_timeout(struct event_context
*ev
, struct timed_event
*te
,
265 struct timeval t
, void *private_data
)
267 struct ctdb_sticky_record
*sr
= talloc_get_type(private_data
,
268 struct ctdb_sticky_record
);
270 DEBUG(DEBUG_ERR
,("Pindown timeout db:%s unstick record\n", sr
->ctdb_db
->db_name
));
271 if (sr
->pindown
!= NULL
) {
272 talloc_free(sr
->pindown
);
278 ctdb_set_sticky_pindown(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
280 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
282 struct ctdb_sticky_record
*sr
;
284 k
= talloc_zero_size(tmp_ctx
, ((key
.dsize
+ 3) & 0xfffffffc) + 4);
286 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
287 talloc_free(tmp_ctx
);
291 k
[0] = (key
.dsize
+ 3) / 4 + 1;
292 memcpy(&k
[1], key
.dptr
, key
.dsize
);
294 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
296 talloc_free(tmp_ctx
);
300 talloc_free(tmp_ctx
);
302 if (sr
->pindown
== NULL
) {
303 DEBUG(DEBUG_ERR
,("Pinning down record in %s for %d ms\n", ctdb_db
->db_name
, ctdb
->tunable
.sticky_pindown
));
304 sr
->pindown
= talloc_new(sr
);
305 if (sr
->pindown
== NULL
) {
306 DEBUG(DEBUG_ERR
,("Failed to allocate pindown context for sticky record\n"));
309 event_add_timed(ctdb
->ev
, sr
->pindown
, timeval_current_ofs(ctdb
->tunable
.sticky_pindown
/ 1000, (ctdb
->tunable
.sticky_pindown
* 1000) % 1000000), ctdb_sticky_pindown_timeout
, sr
);
316 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
319 must be called with the chainlock held. This function releases the chainlock
321 static void ctdb_become_dmaster(struct ctdb_db_context
*ctdb_db
,
322 struct ctdb_req_header
*hdr
,
323 TDB_DATA key
, TDB_DATA data
,
324 uint64_t rsn
, uint32_t record_flags
)
326 struct ctdb_call_state
*state
;
327 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
328 struct ctdb_ltdb_header header
;
331 DEBUG(DEBUG_DEBUG
,("pnn %u dmaster response %08x\n", ctdb
->pnn
, ctdb_hash(&key
)));
335 header
.dmaster
= ctdb
->pnn
;
336 header
.flags
= record_flags
;
338 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
341 if (state
->call
->flags
& CTDB_CALL_FLAG_VACUUM_MIGRATION
) {
343 * We temporarily add the VACUUM_MIGRATED flag to
344 * the record flags, so that ctdb_ltdb_store can
345 * decide whether the record should be stored or
348 header
.flags
|= CTDB_REC_FLAG_VACUUM_MIGRATED
;
352 if (ctdb_ltdb_store(ctdb_db
, key
, &header
, data
) != 0) {
353 ctdb_fatal(ctdb
, "ctdb_reply_dmaster store failed\n");
355 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
357 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
362 /* we just became DMASTER and this database is "sticky",
363 see if the record is flagged as "hot" and set up a pin-down
364 context to stop migrations for a little while if so
366 if (ctdb_db
->sticky
) {
367 ctdb_set_sticky_pindown(ctdb
, ctdb_db
, key
);
371 DEBUG(DEBUG_ERR
,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372 ctdb
->pnn
, hdr
->reqid
, hdr
->srcnode
));
374 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
376 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
381 if (key
.dsize
!= state
->call
->key
.dsize
|| memcmp(key
.dptr
, state
->call
->key
.dptr
, key
.dsize
)) {
382 DEBUG(DEBUG_ERR
, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr
->reqid
, hdr
->srcnode
));
384 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
386 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
391 if (hdr
->reqid
!= state
->reqid
) {
392 /* we found a record but it was the wrong one */
393 DEBUG(DEBUG_ERR
, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr
->reqid
, hdr
->srcnode
));
395 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
397 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
402 ctdb_call_local(ctdb_db
, state
->call
, &header
, state
, &data
, true);
404 ret
= ctdb_ltdb_unlock(ctdb_db
, state
->call
->key
);
406 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
409 state
->state
= CTDB_CALL_DONE
;
410 if (state
->async
.fn
) {
411 state
->async
.fn(state
);
418 called when a CTDB_REQ_DMASTER packet comes in
420 this comes into the lmaster for a record when the current dmaster
421 wants to give up the dmaster role and give it to someone else
423 void ctdb_request_dmaster(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
425 struct ctdb_req_dmaster
*c
= (struct ctdb_req_dmaster
*)hdr
;
426 TDB_DATA key
, data
, data2
;
427 struct ctdb_ltdb_header header
;
428 struct ctdb_db_context
*ctdb_db
;
429 uint32_t record_flags
= 0;
434 key
.dsize
= c
->keylen
;
435 data
.dptr
= c
->data
+ c
->keylen
;
436 data
.dsize
= c
->datalen
;
437 len
= offsetof(struct ctdb_req_dmaster
, data
) + key
.dsize
+ data
.dsize
439 if (len
<= c
->hdr
.length
) {
440 record_flags
= *(uint32_t *)&c
->data
[c
->keylen
+ c
->datalen
];
443 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
445 ctdb_send_error(ctdb
, hdr
, -1,
446 "Unknown database in request. db_id==0x%08x",
451 /* fetch the current record */
452 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, key
, &header
, hdr
, &data2
,
453 ctdb_call_input_pkt
, ctdb
, false);
455 ctdb_fatal(ctdb
, "ctdb_req_dmaster failed to fetch record");
459 DEBUG(DEBUG_INFO
,(__location__
" deferring ctdb_request_dmaster\n"));
463 if (ctdb_lmaster(ctdb
, &key
) != ctdb
->pnn
) {
464 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
465 ctdb
->pnn
, ctdb_lmaster(ctdb
, &key
),
466 hdr
->generation
, ctdb
->vnn_map
->generation
));
467 ctdb_fatal(ctdb
, "ctdb_req_dmaster to non-lmaster");
470 DEBUG(DEBUG_DEBUG
,("pnn %u dmaster request on %08x for %u from %u\n",
471 ctdb
->pnn
, ctdb_hash(&key
), c
->dmaster
, c
->hdr
.srcnode
));
473 /* its a protocol error if the sending node is not the current dmaster */
474 if (header
.dmaster
!= hdr
->srcnode
) {
475 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
476 ctdb
->pnn
, c
->dmaster
, hdr
->srcnode
, header
.dmaster
, ctdb_hash(&key
),
477 ctdb_db
->db_id
, hdr
->generation
, ctdb
->vnn_map
->generation
,
478 (unsigned long long)c
->rsn
, (unsigned long long)header
.rsn
, c
->hdr
.reqid
,
479 (key
.dsize
>= 4)?(*(uint32_t *)key
.dptr
):0));
480 if (header
.rsn
!= 0 || header
.dmaster
!= ctdb
->pnn
) {
481 DEBUG(DEBUG_ERR
,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
483 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
484 ctdb_ltdb_unlock(ctdb_db
, key
);
489 if (header
.rsn
> c
->rsn
) {
490 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
491 ctdb
->pnn
, c
->dmaster
, hdr
->srcnode
, header
.dmaster
, ctdb_hash(&key
),
492 ctdb_db
->db_id
, hdr
->generation
, ctdb
->vnn_map
->generation
,
493 (unsigned long long)c
->rsn
, (unsigned long long)header
.rsn
, c
->hdr
.reqid
));
496 /* use the rsn from the sending node */
499 /* store the record flags from the sending node */
500 header
.flags
= record_flags
;
502 /* check if the new dmaster is the lmaster, in which case we
503 skip the dmaster reply */
504 if (c
->dmaster
== ctdb
->pnn
) {
505 ctdb_become_dmaster(ctdb_db
, hdr
, key
, data
, c
->rsn
, record_flags
);
507 ctdb_send_dmaster_reply(ctdb_db
, &header
, key
, data
, c
->dmaster
, hdr
->reqid
);
509 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
511 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
516 static void ctdb_sticky_record_timeout(struct event_context
*ev
, struct timed_event
*te
,
517 struct timeval t
, void *private_data
)
519 struct ctdb_sticky_record
*sr
= talloc_get_type(private_data
,
520 struct ctdb_sticky_record
);
524 static void *ctdb_make_sticky_record_callback(void *parm
, void *data
)
527 DEBUG(DEBUG_ERR
,("Already have sticky record registered. Free old %p and create new %p\n", data
, parm
));
534 ctdb_make_record_sticky(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
536 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
538 struct ctdb_sticky_record
*sr
;
540 k
= talloc_zero_size(tmp_ctx
, ((key
.dsize
+ 3) & 0xfffffffc) + 4);
542 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
543 talloc_free(tmp_ctx
);
547 k
[0] = (key
.dsize
+ 3) / 4 + 1;
548 memcpy(&k
[1], key
.dptr
, key
.dsize
);
550 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
552 talloc_free(tmp_ctx
);
556 sr
= talloc(ctdb_db
->sticky_records
, struct ctdb_sticky_record
);
558 talloc_free(tmp_ctx
);
559 DEBUG(DEBUG_ERR
,("Failed to allocate sticky record structure\n"));
564 sr
->ctdb_db
= ctdb_db
;
567 DEBUG(DEBUG_ERR
,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
568 ctdb
->tunable
.sticky_duration
,
569 ctdb_db
->db_name
, ctdb_hash(&key
)));
571 trbt_insertarray32_callback(ctdb_db
->sticky_records
, k
[0], &k
[0], ctdb_make_sticky_record_callback
, sr
);
573 event_add_timed(ctdb
->ev
, sr
, timeval_current_ofs(ctdb
->tunable
.sticky_duration
, 0), ctdb_sticky_record_timeout
, sr
);
575 talloc_free(tmp_ctx
);
579 struct pinned_down_requeue_handle
{
580 struct ctdb_context
*ctdb
;
581 struct ctdb_req_header
*hdr
;
584 struct pinned_down_deferred_call
{
585 struct ctdb_context
*ctdb
;
586 struct ctdb_req_header
*hdr
;
589 static void pinned_down_requeue(struct event_context
*ev
, struct timed_event
*te
,
590 struct timeval t
, void *private_data
)
592 struct pinned_down_requeue_handle
*handle
= talloc_get_type(private_data
, struct pinned_down_requeue_handle
);
593 struct ctdb_context
*ctdb
= handle
->ctdb
;
595 talloc_steal(ctdb
, handle
->hdr
);
596 ctdb_call_input_pkt(ctdb
, handle
->hdr
);
601 static int pinned_down_destructor(struct pinned_down_deferred_call
*pinned_down
)
603 struct ctdb_context
*ctdb
= pinned_down
->ctdb
;
604 struct pinned_down_requeue_handle
*handle
= talloc(ctdb
, struct pinned_down_requeue_handle
);
606 handle
->ctdb
= pinned_down
->ctdb
;
607 handle
->hdr
= pinned_down
->hdr
;
608 talloc_steal(handle
, handle
->hdr
);
610 event_add_timed(ctdb
->ev
, handle
, timeval_zero(), pinned_down_requeue
, handle
);
616 ctdb_defer_pinned_down_request(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_req_header
*hdr
)
618 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
620 struct ctdb_sticky_record
*sr
;
621 struct pinned_down_deferred_call
*pinned_down
;
623 k
= talloc_zero_size(tmp_ctx
, ((key
.dsize
+ 3) & 0xfffffffc) + 4);
625 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
626 talloc_free(tmp_ctx
);
630 k
[0] = (key
.dsize
+ 3) / 4 + 1;
631 memcpy(&k
[1], key
.dptr
, key
.dsize
);
633 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
635 talloc_free(tmp_ctx
);
639 talloc_free(tmp_ctx
);
641 if (sr
->pindown
== NULL
) {
645 pinned_down
= talloc(sr
->pindown
, struct pinned_down_deferred_call
);
646 if (pinned_down
== NULL
) {
647 DEBUG(DEBUG_ERR
,("Failed to allocate structure for deferred pinned down request\n"));
651 pinned_down
->ctdb
= ctdb
;
652 pinned_down
->hdr
= hdr
;
654 talloc_set_destructor(pinned_down
, pinned_down_destructor
);
655 talloc_steal(pinned_down
, hdr
);
661 ctdb_update_db_stat_hot_keys(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, int hopcount
)
665 /* smallest value is always at index 0 */
666 if (hopcount
<= ctdb_db
->statistics
.hot_keys
[0].count
) {
670 /* see if we already know this key */
671 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
672 if (key
.dsize
!= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
) {
675 if (memcmp(key
.dptr
, ctdb_db
->statistics
.hot_keys
[i
].key
.dptr
, key
.dsize
)) {
678 /* found an entry for this key */
679 if (hopcount
<= ctdb_db
->statistics
.hot_keys
[i
].count
) {
682 ctdb_db
->statistics
.hot_keys
[i
].count
= hopcount
;
686 if (ctdb_db
->statistics
.num_hot_keys
< MAX_HOT_KEYS
) {
687 id
= ctdb_db
->statistics
.num_hot_keys
;
688 ctdb_db
->statistics
.num_hot_keys
++;
693 if (ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
!= NULL
) {
694 talloc_free(ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
);
696 ctdb_db
->statistics
.hot_keys
[id
].key
.dsize
= key
.dsize
;
697 ctdb_db
->statistics
.hot_keys
[id
].key
.dptr
= talloc_memdup(ctdb_db
, key
.dptr
, key
.dsize
);
698 ctdb_db
->statistics
.hot_keys
[id
].count
= hopcount
;
699 DEBUG(DEBUG_NOTICE
,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
700 ctdb_db
->db_name
, ctdb_hash(&key
), id
, hopcount
));
703 for (i
= 1; i
< MAX_HOT_KEYS
; i
++) {
704 if (ctdb_db
->statistics
.hot_keys
[i
].count
== 0) {
707 if (ctdb_db
->statistics
.hot_keys
[i
].count
< ctdb_db
->statistics
.hot_keys
[0].count
) {
708 hopcount
= ctdb_db
->statistics
.hot_keys
[i
].count
;
709 ctdb_db
->statistics
.hot_keys
[i
].count
= ctdb_db
->statistics
.hot_keys
[0].count
;
710 ctdb_db
->statistics
.hot_keys
[0].count
= hopcount
;
712 key
= ctdb_db
->statistics
.hot_keys
[i
].key
;
713 ctdb_db
->statistics
.hot_keys
[i
].key
= ctdb_db
->statistics
.hot_keys
[0].key
;
714 ctdb_db
->statistics
.hot_keys
[0].key
= key
;
720 called when a CTDB_REQ_CALL packet comes in
722 void ctdb_request_call(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
724 struct ctdb_req_call
*c
= (struct ctdb_req_call
*)hdr
;
726 struct ctdb_reply_call
*r
;
728 struct ctdb_ltdb_header header
;
729 struct ctdb_call
*call
;
730 struct ctdb_db_context
*ctdb_db
;
731 int tmp_count
, bucket
;
733 if (ctdb
->methods
== NULL
) {
734 DEBUG(DEBUG_INFO
,(__location__
" Failed ctdb_request_call. Transport is DOWN\n"));
739 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
741 ctdb_send_error(ctdb
, hdr
, -1,
742 "Unknown database in request. db_id==0x%08x",
747 call
= talloc(hdr
, struct ctdb_call
);
748 CTDB_NO_MEMORY_FATAL(ctdb
, call
);
750 call
->call_id
= c
->callid
;
751 call
->key
.dptr
= c
->data
;
752 call
->key
.dsize
= c
->keylen
;
753 call
->call_data
.dptr
= c
->data
+ c
->keylen
;
754 call
->call_data
.dsize
= c
->calldatalen
;
755 call
->reply_data
.dptr
= NULL
;
756 call
->reply_data
.dsize
= 0;
759 /* If this record is pinned down we should defer the
760 request until the pindown times out
762 if (ctdb_db
->sticky
) {
763 if (ctdb_defer_pinned_down_request(ctdb
, ctdb_db
, call
->key
, hdr
) == 0) {
765 ("Defer request for pinned down record in %s\n", ctdb_db
->db_name
));
772 /* determine if we are the dmaster for this key. This also
773 fetches the record data (if any), thus avoiding a 2nd fetch of the data
774 if the call will be answered locally */
776 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, call
->key
, &header
, hdr
, &data
,
777 ctdb_call_input_pkt
, ctdb
, false);
779 ctdb_send_error(ctdb
, hdr
, ret
, "ltdb fetch failed in ctdb_request_call");
784 DEBUG(DEBUG_INFO
,(__location__
" deferred ctdb_request_call\n"));
789 /* Dont do READONLY if we dont have a tracking database */
790 if ((c
->flags
& CTDB_WANT_READONLY
) && !ctdb_db
->readonly
) {
791 c
->flags
&= ~CTDB_WANT_READONLY
;
794 if (header
.flags
& CTDB_REC_RO_REVOKE_COMPLETE
) {
795 header
.flags
&= ~CTDB_REC_RO_FLAGS
;
796 CTDB_INCREMENT_STAT(ctdb
, total_ro_revokes
);
797 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_revokes
);
798 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
799 ctdb_fatal(ctdb
, "Failed to write header with cleared REVOKE flag");
801 /* and clear out the tracking data */
802 if (tdb_delete(ctdb_db
->rottdb
, call
->key
) != 0) {
803 DEBUG(DEBUG_ERR
,(__location__
" Failed to clear out trackingdb record\n"));
807 /* if we are revoking, we must defer all other calls until the revoke
810 if (header
.flags
& CTDB_REC_RO_REVOKING_READONLY
) {
811 talloc_free(data
.dptr
);
812 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
814 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, call
->key
, hdr
, ctdb_call_input_pkt
, ctdb
) != 0) {
815 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
822 * If we are not the dmaster and are not hosting any delegations,
823 * then we redirect the request to the node than can answer it
824 * (the lmaster or the dmaster).
826 if ((header
.dmaster
!= ctdb
->pnn
)
827 && (!(header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
)) ) {
828 talloc_free(data
.dptr
);
829 ctdb_call_send_redirect(ctdb
, ctdb_db
, call
->key
, c
, &header
);
831 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
833 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
839 if ( (!(c
->flags
& CTDB_WANT_READONLY
))
840 && (header
.flags
& (CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
)) ) {
841 header
.flags
|= CTDB_REC_RO_REVOKING_READONLY
;
842 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
843 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
845 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
847 if (ctdb_start_revoke_ro_record(ctdb
, ctdb_db
, call
->key
, &header
, data
) != 0) {
848 ctdb_fatal(ctdb
, "Failed to start record revoke");
850 talloc_free(data
.dptr
);
852 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, call
->key
, hdr
, ctdb_call_input_pkt
, ctdb
) != 0) {
853 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
860 /* If this is the first request for delegation. bump rsn and set
861 * the delegations flag
863 if ((c
->flags
& CTDB_WANT_READONLY
)
864 && (c
->callid
== CTDB_FETCH_WITH_HEADER_FUNC
)
865 && (!(header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
))) {
867 header
.flags
|= CTDB_REC_RO_HAVE_DELEGATIONS
;
868 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
869 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
872 if ((c
->flags
& CTDB_WANT_READONLY
)
873 && (call
->call_id
== CTDB_FETCH_WITH_HEADER_FUNC
)) {
876 tdata
= tdb_fetch(ctdb_db
->rottdb
, call
->key
);
877 if (ctdb_trackingdb_add_pnn(ctdb
, &tdata
, c
->hdr
.srcnode
) != 0) {
878 ctdb_fatal(ctdb
, "Failed to add node to trackingdb");
880 if (tdb_store(ctdb_db
->rottdb
, call
->key
, tdata
, TDB_REPLACE
) != 0) {
881 ctdb_fatal(ctdb
, "Failed to store trackingdb data");
885 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
887 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
890 len
= offsetof(struct ctdb_reply_call
, data
) + data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
891 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REPLY_CALL
, len
,
892 struct ctdb_reply_call
);
893 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
894 r
->hdr
.destnode
= c
->hdr
.srcnode
;
895 r
->hdr
.reqid
= c
->hdr
.reqid
;
897 r
->datalen
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
899 header
.flags
|= CTDB_REC_RO_HAVE_READONLY
;
900 header
.flags
&= ~CTDB_REC_RO_HAVE_DELEGATIONS
;
901 memcpy(&r
->data
[0], &header
, sizeof(struct ctdb_ltdb_header
));
904 memcpy(&r
->data
[sizeof(struct ctdb_ltdb_header
)], data
.dptr
, data
.dsize
);
907 ctdb_queue_packet(ctdb
, &r
->hdr
);
908 CTDB_INCREMENT_STAT(ctdb
, total_ro_delegations
);
909 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_delegations
);
916 CTDB_UPDATE_STAT(ctdb
, max_hop_count
, c
->hopcount
);
917 tmp_count
= c
->hopcount
;
923 if (bucket
>= MAX_COUNT_BUCKETS
) {
924 bucket
= MAX_COUNT_BUCKETS
- 1;
926 CTDB_INCREMENT_STAT(ctdb
, hop_count_bucket
[bucket
]);
927 CTDB_INCREMENT_DB_STAT(ctdb_db
, hop_count_bucket
[bucket
]);
928 ctdb_update_db_stat_hot_keys(ctdb_db
, call
->key
, c
->hopcount
);
930 /* If this database supports sticky records, then check if the
931 hopcount is big. If it is it means the record is hot and we
932 should make it sticky.
934 if (ctdb_db
->sticky
&& c
->hopcount
>= ctdb
->tunable
.hopcount_make_sticky
) {
935 ctdb_make_record_sticky(ctdb
, ctdb_db
, call
->key
);
939 /* Try if possible to migrate the record off to the caller node.
940 * From the clients perspective a fetch of the data is just as
941 * expensive as a migration.
943 if (c
->hdr
.srcnode
!= ctdb
->pnn
) {
944 if (ctdb_db
->persistent_state
) {
945 DEBUG(DEBUG_INFO
, (__location__
" refusing migration"
946 " of key %s while transaction is active\n",
947 (char *)call
->key
.dptr
));
949 DEBUG(DEBUG_DEBUG
,("pnn %u starting migration of %08x to %u\n",
950 ctdb
->pnn
, ctdb_hash(&(call
->key
)), c
->hdr
.srcnode
));
951 ctdb_call_send_dmaster(ctdb_db
, c
, &header
, &(call
->key
), &data
);
952 talloc_free(data
.dptr
);
954 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
956 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
963 ret
= ctdb_call_local(ctdb_db
, call
, &header
, hdr
, &data
, true);
965 DEBUG(DEBUG_ERR
,(__location__
" ctdb_call_local failed\n"));
969 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
971 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
974 len
= offsetof(struct ctdb_reply_call
, data
) + call
->reply_data
.dsize
;
975 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REPLY_CALL
, len
,
976 struct ctdb_reply_call
);
977 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
978 r
->hdr
.destnode
= hdr
->srcnode
;
979 r
->hdr
.reqid
= hdr
->reqid
;
980 r
->status
= call
->status
;
981 r
->datalen
= call
->reply_data
.dsize
;
982 if (call
->reply_data
.dsize
) {
983 memcpy(&r
->data
[0], call
->reply_data
.dptr
, call
->reply_data
.dsize
);
986 ctdb_queue_packet(ctdb
, &r
->hdr
);
993 * called when a CTDB_REPLY_CALL packet comes in
995 * This packet comes in response to a CTDB_REQ_CALL request packet. It
996 * contains any reply data from the call
998 void ctdb_reply_call(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1000 struct ctdb_reply_call
*c
= (struct ctdb_reply_call
*)hdr
;
1001 struct ctdb_call_state
*state
;
1003 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
1004 if (state
== NULL
) {
1005 DEBUG(DEBUG_ERR
, (__location__
" reqid %u not found\n", hdr
->reqid
));
1009 if (hdr
->reqid
!= state
->reqid
) {
1010 /* we found a record but it was the wrong one */
1011 DEBUG(DEBUG_ERR
, ("Dropped orphaned call reply with reqid:%u\n",hdr
->reqid
));
1016 /* read only delegation processing */
1017 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1018 * delegation since we may need to update the record header
1020 if (state
->c
->callid
== CTDB_FETCH_WITH_HEADER_FUNC
) {
1021 struct ctdb_db_context
*ctdb_db
= state
->ctdb_db
;
1022 struct ctdb_ltdb_header
*header
= (struct ctdb_ltdb_header
*)&c
->data
[0];
1023 struct ctdb_ltdb_header oldheader
;
1024 TDB_DATA key
, data
, olddata
;
1027 if (!(header
->flags
& CTDB_REC_RO_HAVE_READONLY
)) {
1032 key
.dsize
= state
->c
->keylen
;
1033 key
.dptr
= state
->c
->data
;
1034 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
,
1035 ctdb_call_input_pkt
, ctdb
, false);
1040 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock in ctdb_reply_call\n"));
1044 ret
= ctdb_ltdb_fetch(ctdb_db
, key
, &oldheader
, state
, &olddata
);
1046 DEBUG(DEBUG_ERR
, ("Failed to fetch old record in ctdb_reply_call\n"));
1047 ctdb_ltdb_unlock(ctdb_db
, key
);
1051 if (header
->rsn
<= oldheader
.rsn
) {
1052 ctdb_ltdb_unlock(ctdb_db
, key
);
1056 if (c
->datalen
< sizeof(struct ctdb_ltdb_header
)) {
1057 DEBUG(DEBUG_ERR
,(__location__
" Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c
->datalen
));
1058 ctdb_ltdb_unlock(ctdb_db
, key
);
1062 data
.dsize
= c
->datalen
- sizeof(struct ctdb_ltdb_header
);
1063 data
.dptr
= &c
->data
[sizeof(struct ctdb_ltdb_header
)];
1064 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, data
);
1066 DEBUG(DEBUG_ERR
, ("Failed to store new record in ctdb_reply_call\n"));
1067 ctdb_ltdb_unlock(ctdb_db
, key
);
1071 ctdb_ltdb_unlock(ctdb_db
, key
);
1075 state
->call
->reply_data
.dptr
= c
->data
;
1076 state
->call
->reply_data
.dsize
= c
->datalen
;
1077 state
->call
->status
= c
->status
;
1079 talloc_steal(state
, c
);
1081 state
->state
= CTDB_CALL_DONE
;
1082 if (state
->async
.fn
) {
1083 state
->async
.fn(state
);
1089 * called when a CTDB_REPLY_DMASTER packet comes in
1091 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1092 * request packet. It means that the current dmaster wants to give us
1095 void ctdb_reply_dmaster(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1097 struct ctdb_reply_dmaster
*c
= (struct ctdb_reply_dmaster
*)hdr
;
1098 struct ctdb_db_context
*ctdb_db
;
1100 uint32_t record_flags
= 0;
1104 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
1105 if (ctdb_db
== NULL
) {
1106 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c
->db_id
));
1111 key
.dsize
= c
->keylen
;
1112 data
.dptr
= &c
->data
[key
.dsize
];
1113 data
.dsize
= c
->datalen
;
1114 len
= offsetof(struct ctdb_reply_dmaster
, data
) + key
.dsize
+ data
.dsize
1116 if (len
<= c
->hdr
.length
) {
1117 record_flags
= *(uint32_t *)&c
->data
[c
->keylen
+ c
->datalen
];
1120 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
,
1121 ctdb_call_input_pkt
, ctdb
, false);
1126 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock in ctdb_reply_dmaster\n"));
1130 ctdb_become_dmaster(ctdb_db
, hdr
, key
, data
, c
->rsn
, record_flags
);
1135 called when a CTDB_REPLY_ERROR packet comes in
1137 void ctdb_reply_error(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1139 struct ctdb_reply_error
*c
= (struct ctdb_reply_error
*)hdr
;
1140 struct ctdb_call_state
*state
;
1142 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
1143 if (state
== NULL
) {
1144 DEBUG(DEBUG_ERR
,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1145 ctdb
->pnn
, hdr
->reqid
));
1149 if (hdr
->reqid
!= state
->reqid
) {
1150 /* we found a record but it was the wrong one */
1151 DEBUG(DEBUG_ERR
, ("Dropped orphaned error reply with reqid:%u\n",hdr
->reqid
));
1155 talloc_steal(state
, c
);
1157 state
->state
= CTDB_CALL_ERROR
;
1158 state
->errmsg
= (char *)c
->msg
;
1159 if (state
->async
.fn
) {
1160 state
->async
.fn(state
);
1168 static int ctdb_call_destructor(struct ctdb_call_state
*state
)
1170 DLIST_REMOVE(state
->ctdb_db
->ctdb
->pending_calls
, state
);
1171 ctdb_reqid_remove(state
->ctdb_db
->ctdb
, state
->reqid
);
1177 called when a ctdb_call needs to be resent after a reconfigure event
1179 static void ctdb_call_resend(struct ctdb_call_state
*state
)
1181 struct ctdb_context
*ctdb
= state
->ctdb_db
->ctdb
;
1183 state
->generation
= ctdb
->vnn_map
->generation
;
1185 /* use a new reqid, in case the old reply does eventually come in */
1186 ctdb_reqid_remove(ctdb
, state
->reqid
);
1187 state
->reqid
= ctdb_reqid_new(ctdb
, state
);
1188 state
->c
->hdr
.reqid
= state
->reqid
;
1190 /* update the generation count for this request, so its valid with the new vnn_map */
1191 state
->c
->hdr
.generation
= state
->generation
;
1193 /* send the packet to ourselves, it will be redirected appropriately */
1194 state
->c
->hdr
.destnode
= ctdb
->pnn
;
1196 ctdb_queue_packet(ctdb
, &state
->c
->hdr
);
1197 DEBUG(DEBUG_NOTICE
,("resent ctdb_call\n"));
1201 resend all pending calls on recovery
1203 void ctdb_call_resend_all(struct ctdb_context
*ctdb
)
1205 struct ctdb_call_state
*state
, *next
;
1206 for (state
=ctdb
->pending_calls
;state
;state
=next
) {
1208 ctdb_call_resend(state
);
1213 this allows the caller to setup a async.fn
1215 static void call_local_trigger(struct event_context
*ev
, struct timed_event
*te
,
1216 struct timeval t
, void *private_data
)
1218 struct ctdb_call_state
*state
= talloc_get_type(private_data
, struct ctdb_call_state
);
1219 if (state
->async
.fn
) {
1220 state
->async
.fn(state
);
1226 construct an event driven local ctdb_call
1228 this is used so that locally processed ctdb_call requests are processed
1229 in an event driven manner
1231 struct ctdb_call_state
*ctdb_call_local_send(struct ctdb_db_context
*ctdb_db
,
1232 struct ctdb_call
*call
,
1233 struct ctdb_ltdb_header
*header
,
1236 struct ctdb_call_state
*state
;
1237 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1240 state
= talloc_zero(ctdb_db
, struct ctdb_call_state
);
1241 CTDB_NO_MEMORY_NULL(ctdb
, state
);
1243 talloc_steal(state
, data
->dptr
);
1245 state
->state
= CTDB_CALL_DONE
;
1246 state
->call
= talloc(state
, struct ctdb_call
);
1247 CTDB_NO_MEMORY_NULL(ctdb
, state
->call
);
1248 *(state
->call
) = *call
;
1249 state
->ctdb_db
= ctdb_db
;
1251 ret
= ctdb_call_local(ctdb_db
, state
->call
, header
, state
, data
, true);
1253 DEBUG(DEBUG_DEBUG
,("ctdb_call_local() failed, ignoring return code %d\n", ret
));
1256 event_add_timed(ctdb
->ev
, state
, timeval_zero(), call_local_trigger
, state
);
1263 make a remote ctdb call - async send. Called in daemon context.
1265 This constructs a ctdb_call request and queues it for processing.
1266 This call never blocks.
1268 struct ctdb_call_state
*ctdb_daemon_call_send_remote(struct ctdb_db_context
*ctdb_db
,
1269 struct ctdb_call
*call
,
1270 struct ctdb_ltdb_header
*header
)
1273 struct ctdb_call_state
*state
;
1274 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1276 if (ctdb
->methods
== NULL
) {
1277 DEBUG(DEBUG_INFO
,(__location__
" Failed send packet. Transport is down\n"));
1281 state
= talloc_zero(ctdb_db
, struct ctdb_call_state
);
1282 CTDB_NO_MEMORY_NULL(ctdb
, state
);
1283 state
->call
= talloc(state
, struct ctdb_call
);
1284 CTDB_NO_MEMORY_NULL(ctdb
, state
->call
);
1286 state
->reqid
= ctdb_reqid_new(ctdb
, state
);
1287 state
->ctdb_db
= ctdb_db
;
1288 talloc_set_destructor(state
, ctdb_call_destructor
);
1290 len
= offsetof(struct ctdb_req_call
, data
) + call
->key
.dsize
+ call
->call_data
.dsize
;
1291 state
->c
= ctdb_transport_allocate(ctdb
, state
, CTDB_REQ_CALL
, len
,
1292 struct ctdb_req_call
);
1293 CTDB_NO_MEMORY_NULL(ctdb
, state
->c
);
1294 state
->c
->hdr
.destnode
= header
->dmaster
;
1296 /* this limits us to 16k outstanding messages - not unreasonable */
1297 state
->c
->hdr
.reqid
= state
->reqid
;
1298 state
->c
->flags
= call
->flags
;
1299 state
->c
->db_id
= ctdb_db
->db_id
;
1300 state
->c
->callid
= call
->call_id
;
1301 state
->c
->hopcount
= 0;
1302 state
->c
->keylen
= call
->key
.dsize
;
1303 state
->c
->calldatalen
= call
->call_data
.dsize
;
1304 memcpy(&state
->c
->data
[0], call
->key
.dptr
, call
->key
.dsize
);
1305 memcpy(&state
->c
->data
[call
->key
.dsize
],
1306 call
->call_data
.dptr
, call
->call_data
.dsize
);
1307 *(state
->call
) = *call
;
1308 state
->call
->call_data
.dptr
= &state
->c
->data
[call
->key
.dsize
];
1309 state
->call
->key
.dptr
= &state
->c
->data
[0];
1311 state
->state
= CTDB_CALL_WAIT
;
1312 state
->generation
= ctdb
->vnn_map
->generation
;
1314 DLIST_ADD(ctdb
->pending_calls
, state
);
1316 ctdb_queue_packet(ctdb
, &state
->c
->hdr
);
1322 make a remote ctdb call - async recv - called in daemon context
1324 This is called when the program wants to wait for a ctdb_call to complete and get the
1325 results. This call will block unless the call has already completed.
1327 int ctdb_daemon_call_recv(struct ctdb_call_state
*state
, struct ctdb_call
*call
)
1329 while (state
->state
< CTDB_CALL_DONE
) {
1330 event_loop_once(state
->ctdb_db
->ctdb
->ev
);
1332 if (state
->state
!= CTDB_CALL_DONE
) {
1333 ctdb_set_error(state
->ctdb_db
->ctdb
, "%s", state
->errmsg
);
1338 if (state
->call
->reply_data
.dsize
) {
1339 call
->reply_data
.dptr
= talloc_memdup(call
,
1340 state
->call
->reply_data
.dptr
,
1341 state
->call
->reply_data
.dsize
);
1342 call
->reply_data
.dsize
= state
->call
->reply_data
.dsize
;
1344 call
->reply_data
.dptr
= NULL
;
1345 call
->reply_data
.dsize
= 0;
1347 call
->status
= state
->call
->status
;
1354 send a keepalive packet to the other node
1356 void ctdb_send_keepalive(struct ctdb_context
*ctdb
, uint32_t destnode
)
1358 struct ctdb_req_keepalive
*r
;
1360 if (ctdb
->methods
== NULL
) {
1361 DEBUG(DEBUG_INFO
,(__location__
" Failed to send keepalive. Transport is DOWN\n"));
1365 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_KEEPALIVE
,
1366 sizeof(struct ctdb_req_keepalive
),
1367 struct ctdb_req_keepalive
);
1368 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
1369 r
->hdr
.destnode
= destnode
;
1372 CTDB_INCREMENT_STAT(ctdb
, keepalive_packets_sent
);
1374 ctdb_queue_packet(ctdb
, &r
->hdr
);
1381 struct revokechild_deferred_call
{
1382 struct ctdb_context
*ctdb
;
1383 struct ctdb_req_header
*hdr
;
1384 deferred_requeue_fn fn
;
1388 struct revokechild_handle
{
1389 struct revokechild_handle
*next
, *prev
;
1390 struct ctdb_context
*ctdb
;
1391 struct ctdb_db_context
*ctdb_db
;
1392 struct fd_event
*fde
;
1399 struct revokechild_requeue_handle
{
1400 struct ctdb_context
*ctdb
;
1401 struct ctdb_req_header
*hdr
;
1402 deferred_requeue_fn fn
;
1406 static void deferred_call_requeue(struct event_context
*ev
, struct timed_event
*te
,
1407 struct timeval t
, void *private_data
)
1409 struct revokechild_requeue_handle
*requeue_handle
= talloc_get_type(private_data
, struct revokechild_requeue_handle
);
1411 requeue_handle
->fn(requeue_handle
->ctx
, requeue_handle
->hdr
);
1412 talloc_free(requeue_handle
);
1415 static int deferred_call_destructor(struct revokechild_deferred_call
*deferred_call
)
1417 struct ctdb_context
*ctdb
= deferred_call
->ctdb
;
1418 struct revokechild_requeue_handle
*requeue_handle
= talloc(ctdb
, struct revokechild_requeue_handle
);
1419 struct ctdb_req_call
*c
= (struct ctdb_req_call
*)deferred_call
->hdr
;
1421 requeue_handle
->ctdb
= ctdb
;
1422 requeue_handle
->hdr
= deferred_call
->hdr
;
1423 requeue_handle
->fn
= deferred_call
->fn
;
1424 requeue_handle
->ctx
= deferred_call
->ctx
;
1425 talloc_steal(requeue_handle
, requeue_handle
->hdr
);
1427 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1428 event_add_timed(ctdb
->ev
, requeue_handle
, timeval_current_ofs(c
->flags
& CTDB_WANT_READONLY
? 1 : 0, 0), deferred_call_requeue
, requeue_handle
);
1434 static int revokechild_destructor(struct revokechild_handle
*rc
)
1436 if (rc
->fde
!= NULL
) {
1437 talloc_free(rc
->fde
);
1440 if (rc
->fd
[0] != -1) {
1443 if (rc
->fd
[1] != -1) {
1446 ctdb_kill(rc
->ctdb
, rc
->child
, SIGKILL
);
1448 DLIST_REMOVE(rc
->ctdb_db
->revokechild_active
, rc
);
1452 static void revokechild_handler(struct event_context
*ev
, struct fd_event
*fde
,
1453 uint16_t flags
, void *private_data
)
1455 struct revokechild_handle
*rc
= talloc_get_type(private_data
,
1456 struct revokechild_handle
);
1460 ret
= sys_read(rc
->fd
[0], &c
, 1);
1462 DEBUG(DEBUG_ERR
,("Failed to read status from revokechild. errno:%d\n", errno
));
1468 DEBUG(DEBUG_ERR
,("revokechild returned failure. status:%d\n", c
));
1477 struct ctdb_revoke_state
{
1478 struct ctdb_db_context
*ctdb_db
;
1480 struct ctdb_ltdb_header
*header
;
1487 static void update_record_cb(struct ctdb_client_control_state
*state
)
1489 struct ctdb_revoke_state
*revoke_state
;
1493 if (state
== NULL
) {
1496 revoke_state
= state
->async
.private_data
;
1498 state
->async
.fn
= NULL
;
1499 ret
= ctdb_control_recv(state
->ctdb
, state
, state
, NULL
, &res
, NULL
);
1500 if ((ret
!= 0) || (res
!= 0)) {
1501 DEBUG(DEBUG_ERR
,("Recv for revoke update record failed ret:%d res:%d\n", ret
, res
));
1502 revoke_state
->status
= -1;
1505 revoke_state
->count
--;
1506 if (revoke_state
->count
<= 0) {
1507 revoke_state
->finished
= 1;
1511 static void revoke_send_cb(struct ctdb_context
*ctdb
, uint32_t pnn
, void *private_data
)
1513 struct ctdb_revoke_state
*revoke_state
= private_data
;
1514 struct ctdb_client_control_state
*state
;
1516 state
= ctdb_ctrl_updaterecord_send(ctdb
, revoke_state
, timeval_current_ofs(ctdb
->tunable
.control_timeout
,0), pnn
, revoke_state
->ctdb_db
, revoke_state
->key
, revoke_state
->header
, revoke_state
->data
);
1517 if (state
== NULL
) {
1518 DEBUG(DEBUG_ERR
,("Failure to send update record to revoke readonly delegation\n"));
1519 revoke_state
->status
= -1;
1522 state
->async
.fn
= update_record_cb
;
1523 state
->async
.private_data
= revoke_state
;
1525 revoke_state
->count
++;
1529 static void ctdb_revoke_timeout_handler(struct event_context
*ev
, struct timed_event
*te
,
1530 struct timeval yt
, void *private_data
)
1532 struct ctdb_revoke_state
*state
= private_data
;
1534 DEBUG(DEBUG_ERR
,("Timed out waiting for revoke to finish\n"));
1535 state
->finished
= 1;
1539 static int ctdb_revoke_all_delegations(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA tdata
, TDB_DATA key
, struct ctdb_ltdb_header
*header
, TDB_DATA data
)
1541 struct ctdb_revoke_state
*state
= talloc_zero(ctdb
, struct ctdb_revoke_state
);
1542 struct ctdb_ltdb_header new_header
;
1545 state
->ctdb_db
= ctdb_db
;
1547 state
->header
= header
;
1550 ctdb_trackingdb_traverse(ctdb
, tdata
, revoke_send_cb
, state
);
1552 event_add_timed(ctdb
->ev
, state
, timeval_current_ofs(ctdb
->tunable
.control_timeout
, 0), ctdb_revoke_timeout_handler
, state
);
1554 while (state
->finished
== 0) {
1555 event_loop_once(ctdb
->ev
);
1558 if (ctdb_ltdb_lock(ctdb_db
, key
) != 0) {
1559 DEBUG(DEBUG_ERR
,("Failed to chainlock the database in revokechild\n"));
1563 if (ctdb_ltdb_fetch(ctdb_db
, key
, &new_header
, state
, &new_data
) != 0) {
1564 ctdb_ltdb_unlock(ctdb_db
, key
);
1565 DEBUG(DEBUG_ERR
,("Failed for fetch tdb record in revokechild\n"));
1570 if (new_header
.rsn
> header
->rsn
) {
1571 ctdb_ltdb_unlock(ctdb_db
, key
);
1572 DEBUG(DEBUG_ERR
,("RSN too high in tdb record in revokechild\n"));
1576 if ( (new_header
.flags
& (CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
)) != (CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
) ) {
1577 ctdb_ltdb_unlock(ctdb_db
, key
);
1578 DEBUG(DEBUG_ERR
,("Flags are wrong in tdb record in revokechild\n"));
1584 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1585 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1587 if (state
->status
== 0) {
1589 new_header
.flags
|= CTDB_REC_RO_REVOKE_COMPLETE
;
1591 DEBUG(DEBUG_NOTICE
, ("Revoke all delegations failed, retrying.\n"));
1592 new_header
.flags
&= ~CTDB_REC_RO_REVOKING_READONLY
;
1594 if (ctdb_ltdb_store(ctdb_db
, key
, &new_header
, new_data
) != 0) {
1595 ctdb_ltdb_unlock(ctdb_db
, key
);
1596 DEBUG(DEBUG_ERR
,("Failed to write new record in revokechild\n"));
1600 ctdb_ltdb_unlock(ctdb_db
, key
);
1607 int ctdb_start_revoke_ro_record(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_ltdb_header
*header
, TDB_DATA data
)
1610 struct revokechild_handle
*rc
;
1611 pid_t parent
= getpid();
1614 header
->flags
&= ~(CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
);
1615 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
1618 if ((rc
= talloc_zero(ctdb_db
, struct revokechild_handle
)) == NULL
) {
1619 DEBUG(DEBUG_ERR
,("Failed to allocate revokechild_handle\n"));
1623 tdata
= tdb_fetch(ctdb_db
->rottdb
, key
);
1624 if (tdata
.dsize
> 0) {
1628 tdata
.dptr
= talloc_memdup(rc
, tdata
.dptr
, tdata
.dsize
);
1634 rc
->ctdb_db
= ctdb_db
;
1638 talloc_set_destructor(rc
, revokechild_destructor
);
1640 rc
->key
.dsize
= key
.dsize
;
1641 rc
->key
.dptr
= talloc_memdup(rc
, key
.dptr
, key
.dsize
);
1642 if (rc
->key
.dptr
== NULL
) {
1643 DEBUG(DEBUG_ERR
,("Failed to allocate key for revokechild_handle\n"));
1650 DEBUG(DEBUG_ERR
,("Failed to allocate key for revokechild_handle\n"));
1656 rc
->child
= ctdb_fork(ctdb
);
1657 if (rc
->child
== (pid_t
)-1) {
1658 DEBUG(DEBUG_ERR
,("Failed to fork child for revokechild\n"));
1663 if (rc
->child
== 0) {
1666 debug_extra
= talloc_asprintf(NULL
, "revokechild-%s:", ctdb_db
->db_name
);
1668 ctdb_set_process_name("ctdb_revokechild");
1669 if (switch_from_server_to_client(ctdb
, "revokechild-%s", ctdb_db
->db_name
) != 0) {
1670 DEBUG(DEBUG_ERR
,("Failed to switch from server to client for revokechild process\n"));
1672 goto child_finished
;
1675 c
= ctdb_revoke_all_delegations(ctdb
, ctdb_db
, tdata
, key
, header
, data
);
1678 sys_write(rc
->fd
[1], &c
, 1);
1679 /* make sure we die when our parent dies */
1680 while (ctdb_kill(ctdb
, parent
, 0) == 0 || errno
!= ESRCH
) {
1688 set_close_on_exec(rc
->fd
[0]);
1690 /* This is an active revokechild child process */
1691 DLIST_ADD_END(ctdb_db
->revokechild_active
, rc
, NULL
);
1693 rc
->fde
= event_add_fd(ctdb
->ev
, rc
, rc
->fd
[0],
1694 EVENT_FD_READ
, revokechild_handler
,
1696 if (rc
->fde
== NULL
) {
1697 DEBUG(DEBUG_ERR
,("Failed to set up fd event for revokechild process\n"));
1700 tevent_fd_set_auto_close(rc
->fde
);
1705 int ctdb_add_revoke_deferred_call(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_req_header
*hdr
, deferred_requeue_fn fn
, void *call_context
)
1707 struct revokechild_handle
*rc
;
1708 struct revokechild_deferred_call
*deferred_call
;
1710 for (rc
= ctdb_db
->revokechild_active
; rc
; rc
= rc
->next
) {
1711 if (rc
->key
.dsize
== 0) {
1714 if (rc
->key
.dsize
!= key
.dsize
) {
1717 if (!memcmp(rc
->key
.dptr
, key
.dptr
, key
.dsize
)) {
1723 DEBUG(DEBUG_ERR
,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1727 deferred_call
= talloc(rc
, struct revokechild_deferred_call
);
1728 if (deferred_call
== NULL
) {
1729 DEBUG(DEBUG_ERR
,("Failed to allocate deferred call structure for revoking record\n"));
1733 deferred_call
->ctdb
= ctdb
;
1734 deferred_call
->hdr
= hdr
;
1735 deferred_call
->fn
= fn
;
1736 deferred_call
->ctx
= call_context
;
1738 talloc_set_destructor(deferred_call
, deferred_call_destructor
);
1739 talloc_steal(deferred_call
, hdr
);