2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record
{
32 struct ctdb_context
*ctdb
;
33 struct ctdb_db_context
*ctdb_db
;
38 find the ctdb_db from a db index
40 struct ctdb_db_context
*find_ctdb_db(struct ctdb_context
*ctdb
, uint32_t id
)
42 struct ctdb_db_context
*ctdb_db
;
44 for (ctdb_db
=ctdb
->db_list
; ctdb_db
; ctdb_db
=ctdb_db
->next
) {
45 if (ctdb_db
->db_id
== id
) {
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p
, struct ctdb_req_header
*hdr
)
57 struct ctdb_context
*ctdb
= talloc_get_type(p
, struct ctdb_context
);
58 ctdb_input_pkt(ctdb
, hdr
);
65 static void ctdb_send_error(struct ctdb_context
*ctdb
,
66 struct ctdb_req_header
*hdr
, uint32_t status
,
67 const char *fmt
, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context
*ctdb
,
69 struct ctdb_req_header
*hdr
, uint32_t status
,
73 struct ctdb_reply_error
*r
;
77 if (ctdb
->methods
== NULL
) {
78 DEBUG(DEBUG_INFO
,(__location__
" Failed to send error. Transport is DOWN\n"));
83 msg
= talloc_vasprintf(ctdb
, fmt
, ap
);
85 ctdb_fatal(ctdb
, "Unable to allocate error in ctdb_send_error\n");
89 msglen
= strlen(msg
)+1;
90 len
= offsetof(struct ctdb_reply_error
, msg
);
91 r
= ctdb_transport_allocate(ctdb
, msg
, CTDB_REPLY_ERROR
, len
+ msglen
,
92 struct ctdb_reply_error
);
93 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
95 r
->hdr
.destnode
= hdr
->srcnode
;
96 r
->hdr
.reqid
= hdr
->reqid
;
99 memcpy(&r
->msg
[0], msg
, msglen
);
101 ctdb_queue_packet(ctdb
, &r
->hdr
);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context
*ctdb
,
121 struct ctdb_db_context
*ctdb_db
,
123 struct ctdb_req_call
*c
,
124 struct ctdb_ltdb_header
*header
)
126 uint32_t lmaster
= ctdb_lmaster(ctdb
, &key
);
128 c
->hdr
.destnode
= lmaster
;
129 if (ctdb
->pnn
== lmaster
) {
130 c
->hdr
.destnode
= header
->dmaster
;
134 if (c
->hopcount
%100 == 99) {
135 DEBUG(DEBUG_WARNING
,("High hopcount %d dbid:0x%08x "
136 "key:0x%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c
->hopcount
, ctdb_db
->db_id
, ctdb_hash(&key
),
139 ctdb
->pnn
, c
->hdr
.srcnode
, lmaster
,
140 header
->dmaster
, c
->hdr
.destnode
));
143 ctdb_queue_packet(ctdb
, &c
->hdr
);
150 caller must have the chainlock before calling this routine. Caller must be
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context
*ctdb_db
,
154 struct ctdb_ltdb_header
*header
,
155 TDB_DATA key
, TDB_DATA data
,
156 uint32_t new_dmaster
,
159 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
160 struct ctdb_reply_dmaster
*r
;
164 if (ctdb
->pnn
!= ctdb_lmaster(ctdb
, &key
)) {
165 DEBUG(DEBUG_ALERT
,(__location__
" Caller is not lmaster!\n"));
169 header
->dmaster
= new_dmaster
;
170 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, data
);
172 ctdb_fatal(ctdb
, "ctdb_send_dmaster_reply unable to update dmaster");
176 if (ctdb
->methods
== NULL
) {
177 ctdb_fatal(ctdb
, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx
= talloc_new(ctdb
);
185 /* send the CTDB_REPLY_DMASTER */
186 len
= offsetof(struct ctdb_reply_dmaster
, data
) + key
.dsize
+ data
.dsize
+ sizeof(uint32_t);
187 r
= ctdb_transport_allocate(ctdb
, tmp_ctx
, CTDB_REPLY_DMASTER
, len
,
188 struct ctdb_reply_dmaster
);
189 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
191 r
->hdr
.destnode
= new_dmaster
;
192 r
->hdr
.reqid
= reqid
;
193 r
->rsn
= header
->rsn
;
194 r
->keylen
= key
.dsize
;
195 r
->datalen
= data
.dsize
;
196 r
->db_id
= ctdb_db
->db_id
;
197 memcpy(&r
->data
[0], key
.dptr
, key
.dsize
);
198 memcpy(&r
->data
[key
.dsize
], data
.dptr
, data
.dsize
);
199 memcpy(&r
->data
[key
.dsize
+data
.dsize
], &header
->flags
, sizeof(uint32_t));
201 ctdb_queue_packet(ctdb
, &r
->hdr
);
203 talloc_free(tmp_ctx
);
207 send a dmaster request (give another node the dmaster for a record)
209 This is always sent to the lmaster, which ensures that the lmaster
210 always knows who the dmaster is. The lmaster will then send a
211 CTDB_REPLY_DMASTER to the new dmaster
213 static void ctdb_call_send_dmaster(struct ctdb_db_context
*ctdb_db
,
214 struct ctdb_req_call
*c
,
215 struct ctdb_ltdb_header
*header
,
216 TDB_DATA
*key
, TDB_DATA
*data
)
218 struct ctdb_req_dmaster
*r
;
219 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
221 uint32_t lmaster
= ctdb_lmaster(ctdb
, key
);
223 if (ctdb
->methods
== NULL
) {
224 ctdb_fatal(ctdb
, "Failed ctdb_call_send_dmaster since transport is down");
228 if (data
->dsize
!= 0) {
229 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
232 if (lmaster
== ctdb
->pnn
) {
233 ctdb_send_dmaster_reply(ctdb_db
, header
, *key
, *data
,
234 c
->hdr
.srcnode
, c
->hdr
.reqid
);
238 len
= offsetof(struct ctdb_req_dmaster
, data
) + key
->dsize
+ data
->dsize
240 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_DMASTER
, len
,
241 struct ctdb_req_dmaster
);
242 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
243 r
->hdr
.destnode
= lmaster
;
244 r
->hdr
.reqid
= c
->hdr
.reqid
;
246 r
->rsn
= header
->rsn
;
247 r
->dmaster
= c
->hdr
.srcnode
;
248 r
->keylen
= key
->dsize
;
249 r
->datalen
= data
->dsize
;
250 memcpy(&r
->data
[0], key
->dptr
, key
->dsize
);
251 memcpy(&r
->data
[key
->dsize
], data
->dptr
, data
->dsize
);
252 memcpy(&r
->data
[key
->dsize
+ data
->dsize
], &header
->flags
, sizeof(uint32_t));
254 header
->dmaster
= c
->hdr
.srcnode
;
255 if (ctdb_ltdb_store(ctdb_db
, *key
, header
, *data
) != 0) {
256 ctdb_fatal(ctdb
, "Failed to store record in ctdb_call_send_dmaster");
259 ctdb_queue_packet(ctdb
, &r
->hdr
);
264 static void ctdb_sticky_pindown_timeout(struct event_context
*ev
, struct timed_event
*te
,
265 struct timeval t
, void *private_data
)
267 struct ctdb_sticky_record
*sr
= talloc_get_type(private_data
,
268 struct ctdb_sticky_record
);
270 DEBUG(DEBUG_ERR
,("Pindown timeout db:%s unstick record\n", sr
->ctdb_db
->db_name
));
271 if (sr
->pindown
!= NULL
) {
272 talloc_free(sr
->pindown
);
278 ctdb_set_sticky_pindown(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
280 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
282 struct ctdb_sticky_record
*sr
;
284 k
= talloc_zero_size(tmp_ctx
, ((key
.dsize
+ 3) & 0xfffffffc) + 4);
286 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
287 talloc_free(tmp_ctx
);
291 k
[0] = (key
.dsize
+ 3) / 4 + 1;
292 memcpy(&k
[1], key
.dptr
, key
.dsize
);
294 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
296 talloc_free(tmp_ctx
);
300 talloc_free(tmp_ctx
);
302 if (sr
->pindown
== NULL
) {
303 DEBUG(DEBUG_ERR
,("Pinning down record in %s for %d ms\n", ctdb_db
->db_name
, ctdb
->tunable
.sticky_pindown
));
304 sr
->pindown
= talloc_new(sr
);
305 if (sr
->pindown
== NULL
) {
306 DEBUG(DEBUG_ERR
,("Failed to allocate pindown context for sticky record\n"));
309 event_add_timed(ctdb
->ev
, sr
->pindown
, timeval_current_ofs(ctdb
->tunable
.sticky_pindown
/ 1000, (ctdb
->tunable
.sticky_pindown
* 1000) % 1000000), ctdb_sticky_pindown_timeout
, sr
);
316 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
319 must be called with the chainlock held. This function releases the chainlock
321 static void ctdb_become_dmaster(struct ctdb_db_context
*ctdb_db
,
322 struct ctdb_req_header
*hdr
,
323 TDB_DATA key
, TDB_DATA data
,
324 uint64_t rsn
, uint32_t record_flags
)
326 struct ctdb_call_state
*state
;
327 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
328 struct ctdb_ltdb_header header
;
331 DEBUG(DEBUG_DEBUG
,("pnn %u dmaster response %08x\n", ctdb
->pnn
, ctdb_hash(&key
)));
335 header
.dmaster
= ctdb
->pnn
;
336 header
.flags
= record_flags
;
338 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
341 if (state
->call
->flags
& CTDB_CALL_FLAG_VACUUM_MIGRATION
) {
343 * We temporarily add the VACUUM_MIGRATED flag to
344 * the record flags, so that ctdb_ltdb_store can
345 * decide whether the record should be stored or
348 header
.flags
|= CTDB_REC_FLAG_VACUUM_MIGRATED
;
352 if (ctdb_ltdb_store(ctdb_db
, key
, &header
, data
) != 0) {
353 ctdb_fatal(ctdb
, "ctdb_reply_dmaster store failed\n");
355 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
357 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
362 /* we just became DMASTER and this database is "sticky",
363 see if the record is flagged as "hot" and set up a pin-down
364 context to stop migrations for a little while if so
366 if (ctdb_db
->sticky
) {
367 ctdb_set_sticky_pindown(ctdb
, ctdb_db
, key
);
371 DEBUG(DEBUG_ERR
,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372 ctdb
->pnn
, hdr
->reqid
, hdr
->srcnode
));
374 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
376 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
381 if (key
.dsize
!= state
->call
->key
.dsize
|| memcmp(key
.dptr
, state
->call
->key
.dptr
, key
.dsize
)) {
382 DEBUG(DEBUG_ERR
, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr
->reqid
, hdr
->srcnode
));
384 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
386 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
391 if (hdr
->reqid
!= state
->reqid
) {
392 /* we found a record but it was the wrong one */
393 DEBUG(DEBUG_ERR
, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr
->reqid
, hdr
->srcnode
));
395 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
397 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
402 ctdb_call_local(ctdb_db
, state
->call
, &header
, state
, &data
, true, ctdb
->pnn
);
404 ret
= ctdb_ltdb_unlock(ctdb_db
, state
->call
->key
);
406 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
409 state
->state
= CTDB_CALL_DONE
;
410 if (state
->async
.fn
) {
411 state
->async
.fn(state
);
418 called when a CTDB_REQ_DMASTER packet comes in
420 this comes into the lmaster for a record when the current dmaster
421 wants to give up the dmaster role and give it to someone else
423 void ctdb_request_dmaster(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
425 struct ctdb_req_dmaster
*c
= (struct ctdb_req_dmaster
*)hdr
;
426 TDB_DATA key
, data
, data2
;
427 struct ctdb_ltdb_header header
;
428 struct ctdb_db_context
*ctdb_db
;
429 uint32_t record_flags
= 0;
434 key
.dsize
= c
->keylen
;
435 data
.dptr
= c
->data
+ c
->keylen
;
436 data
.dsize
= c
->datalen
;
437 len
= offsetof(struct ctdb_req_dmaster
, data
) + key
.dsize
+ data
.dsize
439 if (len
<= c
->hdr
.length
) {
440 record_flags
= *(uint32_t *)&c
->data
[c
->keylen
+ c
->datalen
];
443 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
445 ctdb_send_error(ctdb
, hdr
, -1,
446 "Unknown database in request. db_id==0x%08x",
451 /* fetch the current record */
452 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, key
, &header
, hdr
, &data2
,
453 ctdb_call_input_pkt
, ctdb
, false);
455 ctdb_fatal(ctdb
, "ctdb_req_dmaster failed to fetch record");
459 DEBUG(DEBUG_INFO
,(__location__
" deferring ctdb_request_dmaster\n"));
463 if (ctdb_lmaster(ctdb
, &key
) != ctdb
->pnn
) {
464 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
465 ctdb
->pnn
, ctdb_lmaster(ctdb
, &key
),
466 hdr
->generation
, ctdb
->vnn_map
->generation
));
467 ctdb_fatal(ctdb
, "ctdb_req_dmaster to non-lmaster");
470 DEBUG(DEBUG_DEBUG
,("pnn %u dmaster request on %08x for %u from %u\n",
471 ctdb
->pnn
, ctdb_hash(&key
), c
->dmaster
, c
->hdr
.srcnode
));
473 /* its a protocol error if the sending node is not the current dmaster */
474 if (header
.dmaster
!= hdr
->srcnode
) {
475 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
476 ctdb
->pnn
, c
->dmaster
, hdr
->srcnode
, header
.dmaster
, ctdb_hash(&key
),
477 ctdb_db
->db_id
, hdr
->generation
, ctdb
->vnn_map
->generation
,
478 (unsigned long long)c
->rsn
, (unsigned long long)header
.rsn
, c
->hdr
.reqid
,
479 (key
.dsize
>= 4)?(*(uint32_t *)key
.dptr
):0));
480 if (header
.rsn
!= 0 || header
.dmaster
!= ctdb
->pnn
) {
481 DEBUG(DEBUG_ERR
,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
483 ctdb
->recovery_mode
= CTDB_RECOVERY_ACTIVE
;
484 ctdb_ltdb_unlock(ctdb_db
, key
);
489 if (header
.rsn
> c
->rsn
) {
490 DEBUG(DEBUG_ALERT
,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
491 ctdb
->pnn
, c
->dmaster
, hdr
->srcnode
, header
.dmaster
, ctdb_hash(&key
),
492 ctdb_db
->db_id
, hdr
->generation
, ctdb
->vnn_map
->generation
,
493 (unsigned long long)c
->rsn
, (unsigned long long)header
.rsn
, c
->hdr
.reqid
));
496 /* use the rsn from the sending node */
499 /* store the record flags from the sending node */
500 header
.flags
= record_flags
;
502 /* check if the new dmaster is the lmaster, in which case we
503 skip the dmaster reply */
504 if (c
->dmaster
== ctdb
->pnn
) {
505 ctdb_become_dmaster(ctdb_db
, hdr
, key
, data
, c
->rsn
, record_flags
);
507 ctdb_send_dmaster_reply(ctdb_db
, &header
, key
, data
, c
->dmaster
, hdr
->reqid
);
509 ret
= ctdb_ltdb_unlock(ctdb_db
, key
);
511 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
516 static void ctdb_sticky_record_timeout(struct event_context
*ev
, struct timed_event
*te
,
517 struct timeval t
, void *private_data
)
519 struct ctdb_sticky_record
*sr
= talloc_get_type(private_data
,
520 struct ctdb_sticky_record
);
524 static void *ctdb_make_sticky_record_callback(void *parm
, void *data
)
527 DEBUG(DEBUG_ERR
,("Already have sticky record registered. Free old %p and create new %p\n", data
, parm
));
534 ctdb_make_record_sticky(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
536 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
538 struct ctdb_sticky_record
*sr
;
540 k
= talloc_zero_size(tmp_ctx
, ((key
.dsize
+ 3) & 0xfffffffc) + 4);
542 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
543 talloc_free(tmp_ctx
);
547 k
[0] = (key
.dsize
+ 3) / 4 + 1;
548 memcpy(&k
[1], key
.dptr
, key
.dsize
);
550 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
552 talloc_free(tmp_ctx
);
556 sr
= talloc(ctdb_db
->sticky_records
, struct ctdb_sticky_record
);
558 talloc_free(tmp_ctx
);
559 DEBUG(DEBUG_ERR
,("Failed to allocate sticky record structure\n"));
564 sr
->ctdb_db
= ctdb_db
;
567 DEBUG(DEBUG_ERR
,("Make record sticky in db %s\n", ctdb_db
->db_name
));
569 trbt_insertarray32_callback(ctdb_db
->sticky_records
, k
[0], &k
[0], ctdb_make_sticky_record_callback
, sr
);
571 event_add_timed(ctdb
->ev
, sr
, timeval_current_ofs(ctdb
->tunable
.sticky_duration
, 0), ctdb_sticky_record_timeout
, sr
);
573 talloc_free(tmp_ctx
);
577 struct pinned_down_requeue_handle
{
578 struct ctdb_context
*ctdb
;
579 struct ctdb_req_header
*hdr
;
582 struct pinned_down_deferred_call
{
583 struct ctdb_context
*ctdb
;
584 struct ctdb_req_header
*hdr
;
587 static void pinned_down_requeue(struct event_context
*ev
, struct timed_event
*te
,
588 struct timeval t
, void *private_data
)
590 struct pinned_down_requeue_handle
*handle
= talloc_get_type(private_data
, struct pinned_down_requeue_handle
);
591 struct ctdb_context
*ctdb
= handle
->ctdb
;
593 talloc_steal(ctdb
, handle
->hdr
);
594 ctdb_call_input_pkt(ctdb
, handle
->hdr
);
599 static int pinned_down_destructor(struct pinned_down_deferred_call
*pinned_down
)
601 struct ctdb_context
*ctdb
= pinned_down
->ctdb
;
602 struct pinned_down_requeue_handle
*handle
= talloc(ctdb
, struct pinned_down_requeue_handle
);
604 handle
->ctdb
= pinned_down
->ctdb
;
605 handle
->hdr
= pinned_down
->hdr
;
606 talloc_steal(handle
, handle
->hdr
);
608 event_add_timed(ctdb
->ev
, handle
, timeval_zero(), pinned_down_requeue
, handle
);
614 ctdb_defer_pinned_down_request(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_req_header
*hdr
)
616 TALLOC_CTX
*tmp_ctx
= talloc_new(NULL
);
618 struct ctdb_sticky_record
*sr
;
619 struct pinned_down_deferred_call
*pinned_down
;
621 k
= talloc_zero_size(tmp_ctx
, ((key
.dsize
+ 3) & 0xfffffffc) + 4);
623 DEBUG(DEBUG_ERR
,("Failed to allocate key for sticky record\n"));
624 talloc_free(tmp_ctx
);
628 k
[0] = (key
.dsize
+ 3) / 4 + 1;
629 memcpy(&k
[1], key
.dptr
, key
.dsize
);
631 sr
= trbt_lookuparray32(ctdb_db
->sticky_records
, k
[0], &k
[0]);
633 talloc_free(tmp_ctx
);
637 talloc_free(tmp_ctx
);
639 if (sr
->pindown
== NULL
) {
643 pinned_down
= talloc(sr
->pindown
, struct pinned_down_deferred_call
);
644 if (pinned_down
== NULL
) {
645 DEBUG(DEBUG_ERR
,("Failed to allocate structure for deferred pinned down request\n"));
649 pinned_down
->ctdb
= ctdb
;
650 pinned_down
->hdr
= hdr
;
652 talloc_set_destructor(pinned_down
, pinned_down_destructor
);
653 talloc_steal(pinned_down
, hdr
);
659 ctdb_update_db_stat_hot_keys(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, int hopcount
)
663 /* smallest value is always at index 0 */
664 if (hopcount
<= ctdb_db
->statistics
.hot_keys
[0].count
) {
668 /* see if we already know this key */
669 for (i
= 0; i
< MAX_HOT_KEYS
; i
++) {
670 if (key
.dsize
!= ctdb_db
->statistics
.hot_keys
[i
].key
.dsize
) {
673 if (memcmp(key
.dptr
, ctdb_db
->statistics
.hot_keys
[i
].key
.dptr
, key
.dsize
)) {
676 /* found an entry for this key */
677 if (hopcount
<= ctdb_db
->statistics
.hot_keys
[i
].count
) {
680 ctdb_db
->statistics
.hot_keys
[i
].count
= hopcount
;
684 if (ctdb_db
->statistics
.hot_keys
[0].key
.dptr
!= NULL
) {
685 talloc_free(ctdb_db
->statistics
.hot_keys
[0].key
.dptr
);
687 ctdb_db
->statistics
.hot_keys
[0].key
.dsize
= key
.dsize
;
688 ctdb_db
->statistics
.hot_keys
[0].key
.dptr
= talloc_memdup(ctdb_db
, key
.dptr
, key
.dsize
);
689 ctdb_db
->statistics
.hot_keys
[0].count
= hopcount
;
693 for (i
= 2; i
< MAX_HOT_KEYS
; i
++) {
694 if (ctdb_db
->statistics
.hot_keys
[i
].count
< ctdb_db
->statistics
.hot_keys
[0].count
) {
695 hopcount
= ctdb_db
->statistics
.hot_keys
[i
].count
;
696 ctdb_db
->statistics
.hot_keys
[i
].count
= ctdb_db
->statistics
.hot_keys
[0].count
;
697 ctdb_db
->statistics
.hot_keys
[0].count
= hopcount
;
699 key
= ctdb_db
->statistics
.hot_keys
[i
].key
;
700 ctdb_db
->statistics
.hot_keys
[i
].key
= ctdb_db
->statistics
.hot_keys
[0].key
;
701 ctdb_db
->statistics
.hot_keys
[0].key
= key
;
707 called when a CTDB_REQ_CALL packet comes in
709 void ctdb_request_call(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
711 struct ctdb_req_call
*c
= (struct ctdb_req_call
*)hdr
;
713 struct ctdb_reply_call
*r
;
715 struct ctdb_ltdb_header header
;
716 struct ctdb_call
*call
;
717 struct ctdb_db_context
*ctdb_db
;
718 int tmp_count
, bucket
;
720 if (ctdb
->methods
== NULL
) {
721 DEBUG(DEBUG_INFO
,(__location__
" Failed ctdb_request_call. Transport is DOWN\n"));
726 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
728 ctdb_send_error(ctdb
, hdr
, -1,
729 "Unknown database in request. db_id==0x%08x",
734 call
= talloc(hdr
, struct ctdb_call
);
735 CTDB_NO_MEMORY_FATAL(ctdb
, call
);
737 call
->call_id
= c
->callid
;
738 call
->key
.dptr
= c
->data
;
739 call
->key
.dsize
= c
->keylen
;
740 call
->call_data
.dptr
= c
->data
+ c
->keylen
;
741 call
->call_data
.dsize
= c
->calldatalen
;
742 call
->reply_data
.dptr
= NULL
;
743 call
->reply_data
.dsize
= 0;
746 /* If this record is pinned down we should defer the
747 request until the pindown times out
749 if (ctdb_db
->sticky
) {
750 if (ctdb_defer_pinned_down_request(ctdb
, ctdb_db
, call
->key
, hdr
) == 0) {
751 DEBUG(DEBUG_WARNING
,("Defer request for pinned down record in %s\n", ctdb_db
->db_name
));
757 /* determine if we are the dmaster for this key. This also
758 fetches the record data (if any), thus avoiding a 2nd fetch of the data
759 if the call will be answered locally */
761 ret
= ctdb_ltdb_lock_fetch_requeue(ctdb_db
, call
->key
, &header
, hdr
, &data
,
762 ctdb_call_input_pkt
, ctdb
, false);
764 ctdb_send_error(ctdb
, hdr
, ret
, "ltdb fetch failed in ctdb_request_call");
768 DEBUG(DEBUG_INFO
,(__location__
" deferred ctdb_request_call\n"));
772 /* Dont do READONLY if we dont have a tracking database */
773 if ((c
->flags
& CTDB_WANT_READONLY
) && !ctdb_db
->readonly
) {
774 c
->flags
&= ~CTDB_WANT_READONLY
;
777 if (header
.flags
& CTDB_REC_RO_REVOKE_COMPLETE
) {
778 header
.flags
&= ~CTDB_REC_RO_FLAGS
;
779 CTDB_INCREMENT_STAT(ctdb
, total_ro_revokes
);
780 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_revokes
);
781 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
782 ctdb_fatal(ctdb
, "Failed to write header with cleared REVOKE flag");
784 /* and clear out the tracking data */
785 if (tdb_delete(ctdb_db
->rottdb
, call
->key
) != 0) {
786 DEBUG(DEBUG_ERR
,(__location__
" Failed to clear out trackingdb record\n"));
790 /* if we are revoking, we must defer all other calls until the revoke
793 if (header
.flags
& CTDB_REC_RO_REVOKING_READONLY
) {
794 talloc_free(data
.dptr
);
795 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
797 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, call
->key
, hdr
, ctdb_call_input_pkt
, ctdb
) != 0) {
798 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
804 /* if we are not the dmaster and are not hosting any delegations,
805 then send a redirect to the requesting node */
806 if ((header
.dmaster
!= ctdb
->pnn
)
807 && (!(header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
)) ) {
808 talloc_free(data
.dptr
);
809 ctdb_call_send_redirect(ctdb
, ctdb_db
, call
->key
, c
, &header
);
811 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
813 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
818 if ( (!(c
->flags
& CTDB_WANT_READONLY
))
819 && (header
.flags
& (CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
)) ) {
820 header
.flags
|= CTDB_REC_RO_REVOKING_READONLY
;
821 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
822 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
824 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
826 if (ctdb_start_revoke_ro_record(ctdb
, ctdb_db
, call
->key
, &header
, data
) != 0) {
827 ctdb_fatal(ctdb
, "Failed to start record revoke");
829 talloc_free(data
.dptr
);
831 if (ctdb_add_revoke_deferred_call(ctdb
, ctdb_db
, call
->key
, hdr
, ctdb_call_input_pkt
, ctdb
) != 0) {
832 ctdb_fatal(ctdb
, "Failed to add deferred call for revoke child");
839 /* If this is the first request for delegation. bump rsn and set
840 * the delegations flag
842 if ((c
->flags
& CTDB_WANT_READONLY
)
843 && (c
->callid
== CTDB_FETCH_WITH_HEADER_FUNC
)
844 && (!(header
.flags
& CTDB_REC_RO_HAVE_DELEGATIONS
))) {
846 header
.flags
|= CTDB_REC_RO_HAVE_DELEGATIONS
;
847 if (ctdb_ltdb_store(ctdb_db
, call
->key
, &header
, data
) != 0) {
848 ctdb_fatal(ctdb
, "Failed to store record with HAVE_DELEGATIONS set");
851 if ((c
->flags
& CTDB_WANT_READONLY
)
852 && (call
->call_id
== CTDB_FETCH_WITH_HEADER_FUNC
)) {
855 tdata
= tdb_fetch(ctdb_db
->rottdb
, call
->key
);
856 if (ctdb_trackingdb_add_pnn(ctdb
, &tdata
, c
->hdr
.srcnode
) != 0) {
857 ctdb_fatal(ctdb
, "Failed to add node to trackingdb");
859 if (tdb_store(ctdb_db
->rottdb
, call
->key
, tdata
, TDB_REPLACE
) != 0) {
860 ctdb_fatal(ctdb
, "Failed to store trackingdb data");
864 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
866 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
869 len
= offsetof(struct ctdb_reply_call
, data
) + data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
870 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REPLY_CALL
, len
,
871 struct ctdb_reply_call
);
872 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
873 r
->hdr
.destnode
= c
->hdr
.srcnode
;
874 r
->hdr
.reqid
= c
->hdr
.reqid
;
876 r
->datalen
= data
.dsize
+ sizeof(struct ctdb_ltdb_header
);
878 header
.flags
|= CTDB_REC_RO_HAVE_READONLY
;
879 header
.flags
&= ~CTDB_REC_RO_HAVE_DELEGATIONS
;
880 memcpy(&r
->data
[0], &header
, sizeof(struct ctdb_ltdb_header
));
883 memcpy(&r
->data
[sizeof(struct ctdb_ltdb_header
)], data
.dptr
, data
.dsize
);
886 ctdb_queue_packet(ctdb
, &r
->hdr
);
887 CTDB_INCREMENT_STAT(ctdb
, total_ro_delegations
);
888 CTDB_INCREMENT_DB_STAT(ctdb_db
, db_ro_delegations
);
894 CTDB_UPDATE_STAT(ctdb
, max_hop_count
, c
->hopcount
);
895 tmp_count
= c
->hopcount
;
901 if (bucket
>= MAX_COUNT_BUCKETS
) {
902 bucket
= MAX_COUNT_BUCKETS
- 1;
904 CTDB_INCREMENT_STAT(ctdb
, hop_count_bucket
[bucket
]);
905 CTDB_INCREMENT_DB_STAT(ctdb_db
, hop_count_bucket
[bucket
]);
906 ctdb_update_db_stat_hot_keys(ctdb_db
, call
->key
, c
->hopcount
);
908 /* If this database supports sticky records, then check if the
909 hopcount is big. If it is it means the record is hot and we
910 should make it sticky.
912 if (ctdb_db
->sticky
&& c
->hopcount
>= ctdb
->tunable
.hopcount_make_sticky
) {
913 DEBUG(DEBUG_ERR
, ("Hot record in database %s. Hopcount is %d. Make record sticky for %d seconds\n", ctdb_db
->db_name
, c
->hopcount
, ctdb
->tunable
.sticky_duration
));
914 ctdb_make_record_sticky(ctdb
, ctdb_db
, call
->key
);
918 /* if this nodes has done enough consecutive calls on the same record
919 then give them the record
920 or if the node requested an immediate migration
922 if ( c
->hdr
.srcnode
!= ctdb
->pnn
&&
923 ((header
.laccessor
== c
->hdr
.srcnode
924 && header
.lacount
>= ctdb
->tunable
.max_lacount
925 && ctdb
->tunable
.max_lacount
!= 0)
926 || (c
->flags
& CTDB_IMMEDIATE_MIGRATION
)) ) {
927 if (ctdb_db
->transaction_active
) {
928 DEBUG(DEBUG_INFO
, (__location__
" refusing migration"
929 " of key %s while transaction is active\n",
930 (char *)call
->key
.dptr
));
932 DEBUG(DEBUG_DEBUG
,("pnn %u starting migration of %08x to %u\n",
933 ctdb
->pnn
, ctdb_hash(&(call
->key
)), c
->hdr
.srcnode
));
934 ctdb_call_send_dmaster(ctdb_db
, c
, &header
, &(call
->key
), &data
);
935 talloc_free(data
.dptr
);
937 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
939 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
945 ret
= ctdb_call_local(ctdb_db
, call
, &header
, hdr
, &data
, true, c
->hdr
.srcnode
);
947 DEBUG(DEBUG_ERR
,(__location__
" ctdb_call_local failed\n"));
951 ret
= ctdb_ltdb_unlock(ctdb_db
, call
->key
);
953 DEBUG(DEBUG_ERR
,(__location__
" ctdb_ltdb_unlock() failed with error %d\n", ret
));
956 len
= offsetof(struct ctdb_reply_call
, data
) + call
->reply_data
.dsize
;
957 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REPLY_CALL
, len
,
958 struct ctdb_reply_call
);
959 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
960 r
->hdr
.destnode
= hdr
->srcnode
;
961 r
->hdr
.reqid
= hdr
->reqid
;
962 r
->status
= call
->status
;
963 r
->datalen
= call
->reply_data
.dsize
;
964 if (call
->reply_data
.dsize
) {
965 memcpy(&r
->data
[0], call
->reply_data
.dptr
, call
->reply_data
.dsize
);
968 ctdb_queue_packet(ctdb
, &r
->hdr
);
974 called when a CTDB_REPLY_CALL packet comes in
976 This packet comes in response to a CTDB_REQ_CALL request packet. It
977 contains any reply data from the call
979 void ctdb_reply_call(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
981 struct ctdb_reply_call
*c
= (struct ctdb_reply_call
*)hdr
;
982 struct ctdb_call_state
*state
;
984 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
986 DEBUG(DEBUG_ERR
, (__location__
" reqid %u not found\n", hdr
->reqid
));
990 if (hdr
->reqid
!= state
->reqid
) {
991 /* we found a record but it was the wrong one */
992 DEBUG(DEBUG_ERR
, ("Dropped orphaned call reply with reqid:%u\n",hdr
->reqid
));
997 /* read only delegation processing */
998 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
999 * delegation since we may need to update the record header
1001 if (state
->c
->callid
== CTDB_FETCH_WITH_HEADER_FUNC
) {
1002 struct ctdb_db_context
*ctdb_db
= state
->ctdb_db
;
1003 struct ctdb_ltdb_header
*header
= (struct ctdb_ltdb_header
*)&c
->data
[0];
1004 struct ctdb_ltdb_header oldheader
;
1005 TDB_DATA key
, data
, olddata
;
1008 if (!(header
->flags
& CTDB_REC_RO_HAVE_READONLY
)) {
1013 key
.dsize
= state
->c
->keylen
;
1014 key
.dptr
= state
->c
->data
;
1015 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
,
1016 ctdb_call_input_pkt
, ctdb
, false);
1021 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock in ctdb_reply_call\n"));
1025 ret
= ctdb_ltdb_fetch(ctdb_db
, key
, &oldheader
, state
, &olddata
);
1027 DEBUG(DEBUG_ERR
, ("Failed to fetch old record in ctdb_reply_call\n"));
1028 ctdb_ltdb_unlock(ctdb_db
, key
);
1032 if (header
->rsn
<= oldheader
.rsn
) {
1033 ctdb_ltdb_unlock(ctdb_db
, key
);
1037 if (c
->datalen
< sizeof(struct ctdb_ltdb_header
)) {
1038 DEBUG(DEBUG_ERR
,(__location__
" Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c
->datalen
));
1039 ctdb_ltdb_unlock(ctdb_db
, key
);
1043 data
.dsize
= c
->datalen
- sizeof(struct ctdb_ltdb_header
);
1044 data
.dptr
= &c
->data
[sizeof(struct ctdb_ltdb_header
)];
1045 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, data
);
1047 DEBUG(DEBUG_ERR
, ("Failed to store new record in ctdb_reply_call\n"));
1048 ctdb_ltdb_unlock(ctdb_db
, key
);
1052 ctdb_ltdb_unlock(ctdb_db
, key
);
1056 state
->call
->reply_data
.dptr
= c
->data
;
1057 state
->call
->reply_data
.dsize
= c
->datalen
;
1058 state
->call
->status
= c
->status
;
1060 talloc_steal(state
, c
);
1062 state
->state
= CTDB_CALL_DONE
;
1063 if (state
->async
.fn
) {
1064 state
->async
.fn(state
);
1070 called when a CTDB_REPLY_DMASTER packet comes in
1072 This packet comes in from the lmaster response to a CTDB_REQ_CALL
1073 request packet. It means that the current dmaster wants to give us
1076 void ctdb_reply_dmaster(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1078 struct ctdb_reply_dmaster
*c
= (struct ctdb_reply_dmaster
*)hdr
;
1079 struct ctdb_db_context
*ctdb_db
;
1081 uint32_t record_flags
= 0;
1085 ctdb_db
= find_ctdb_db(ctdb
, c
->db_id
);
1086 if (ctdb_db
== NULL
) {
1087 DEBUG(DEBUG_ERR
,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c
->db_id
));
1092 key
.dsize
= c
->keylen
;
1093 data
.dptr
= &c
->data
[key
.dsize
];
1094 data
.dsize
= c
->datalen
;
1095 len
= offsetof(struct ctdb_reply_dmaster
, data
) + key
.dsize
+ data
.dsize
1097 if (len
<= c
->hdr
.length
) {
1098 record_flags
= *(uint32_t *)&c
->data
[c
->keylen
+ c
->datalen
];
1101 ret
= ctdb_ltdb_lock_requeue(ctdb_db
, key
, hdr
,
1102 ctdb_call_input_pkt
, ctdb
, false);
1107 DEBUG(DEBUG_ERR
,(__location__
" Failed to get lock in ctdb_reply_dmaster\n"));
1111 ctdb_become_dmaster(ctdb_db
, hdr
, key
, data
, c
->rsn
, record_flags
);
1116 called when a CTDB_REPLY_ERROR packet comes in
1118 void ctdb_reply_error(struct ctdb_context
*ctdb
, struct ctdb_req_header
*hdr
)
1120 struct ctdb_reply_error
*c
= (struct ctdb_reply_error
*)hdr
;
1121 struct ctdb_call_state
*state
;
1123 state
= ctdb_reqid_find(ctdb
, hdr
->reqid
, struct ctdb_call_state
);
1124 if (state
== NULL
) {
1125 DEBUG(DEBUG_ERR
,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1126 ctdb
->pnn
, hdr
->reqid
));
1130 if (hdr
->reqid
!= state
->reqid
) {
1131 /* we found a record but it was the wrong one */
1132 DEBUG(DEBUG_ERR
, ("Dropped orphaned error reply with reqid:%u\n",hdr
->reqid
));
1136 talloc_steal(state
, c
);
1138 state
->state
= CTDB_CALL_ERROR
;
1139 state
->errmsg
= (char *)c
->msg
;
1140 if (state
->async
.fn
) {
1141 state
->async
.fn(state
);
1149 static int ctdb_call_destructor(struct ctdb_call_state
*state
)
1151 DLIST_REMOVE(state
->ctdb_db
->ctdb
->pending_calls
, state
);
1152 ctdb_reqid_remove(state
->ctdb_db
->ctdb
, state
->reqid
);
1158 called when a ctdb_call needs to be resent after a reconfigure event
1160 static void ctdb_call_resend(struct ctdb_call_state
*state
)
1162 struct ctdb_context
*ctdb
= state
->ctdb_db
->ctdb
;
1164 state
->generation
= ctdb
->vnn_map
->generation
;
1166 /* use a new reqid, in case the old reply does eventually come in */
1167 ctdb_reqid_remove(ctdb
, state
->reqid
);
1168 state
->reqid
= ctdb_reqid_new(ctdb
, state
);
1169 state
->c
->hdr
.reqid
= state
->reqid
;
1171 /* update the generation count for this request, so its valid with the new vnn_map */
1172 state
->c
->hdr
.generation
= state
->generation
;
1174 /* send the packet to ourselves, it will be redirected appropriately */
1175 state
->c
->hdr
.destnode
= ctdb
->pnn
;
1177 ctdb_queue_packet(ctdb
, &state
->c
->hdr
);
1178 DEBUG(DEBUG_NOTICE
,("resent ctdb_call\n"));
1182 resend all pending calls on recovery
1184 void ctdb_call_resend_all(struct ctdb_context
*ctdb
)
1186 struct ctdb_call_state
*state
, *next
;
1187 for (state
=ctdb
->pending_calls
;state
;state
=next
) {
1189 ctdb_call_resend(state
);
1194 this allows the caller to setup a async.fn
1196 static void call_local_trigger(struct event_context
*ev
, struct timed_event
*te
,
1197 struct timeval t
, void *private_data
)
1199 struct ctdb_call_state
*state
= talloc_get_type(private_data
, struct ctdb_call_state
);
1200 if (state
->async
.fn
) {
1201 state
->async
.fn(state
);
1207 construct an event driven local ctdb_call
1209 this is used so that locally processed ctdb_call requests are processed
1210 in an event driven manner
1212 struct ctdb_call_state
*ctdb_call_local_send(struct ctdb_db_context
*ctdb_db
,
1213 struct ctdb_call
*call
,
1214 struct ctdb_ltdb_header
*header
,
1217 struct ctdb_call_state
*state
;
1218 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1221 state
= talloc_zero(ctdb_db
, struct ctdb_call_state
);
1222 CTDB_NO_MEMORY_NULL(ctdb
, state
);
1224 talloc_steal(state
, data
->dptr
);
1226 state
->state
= CTDB_CALL_DONE
;
1227 state
->call
= talloc(state
, struct ctdb_call
);
1228 CTDB_NO_MEMORY_NULL(ctdb
, state
->call
);
1229 *(state
->call
) = *call
;
1230 state
->ctdb_db
= ctdb_db
;
1232 ret
= ctdb_call_local(ctdb_db
, state
->call
, header
, state
, data
, true, ctdb
->pnn
);
1234 DEBUG(DEBUG_DEBUG
,("ctdb_call_local() failed, ignoring return code %d\n", ret
));
1237 event_add_timed(ctdb
->ev
, state
, timeval_zero(), call_local_trigger
, state
);
1244 make a remote ctdb call - async send. Called in daemon context.
1246 This constructs a ctdb_call request and queues it for processing.
1247 This call never blocks.
1249 struct ctdb_call_state
*ctdb_daemon_call_send_remote(struct ctdb_db_context
*ctdb_db
,
1250 struct ctdb_call
*call
,
1251 struct ctdb_ltdb_header
*header
)
1254 struct ctdb_call_state
*state
;
1255 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
1257 if (ctdb
->methods
== NULL
) {
1258 DEBUG(DEBUG_INFO
,(__location__
" Failed send packet. Transport is down\n"));
1262 state
= talloc_zero(ctdb_db
, struct ctdb_call_state
);
1263 CTDB_NO_MEMORY_NULL(ctdb
, state
);
1264 state
->call
= talloc(state
, struct ctdb_call
);
1265 CTDB_NO_MEMORY_NULL(ctdb
, state
->call
);
1267 state
->reqid
= ctdb_reqid_new(ctdb
, state
);
1268 state
->ctdb_db
= ctdb_db
;
1269 talloc_set_destructor(state
, ctdb_call_destructor
);
1271 len
= offsetof(struct ctdb_req_call
, data
) + call
->key
.dsize
+ call
->call_data
.dsize
;
1272 state
->c
= ctdb_transport_allocate(ctdb
, state
, CTDB_REQ_CALL
, len
,
1273 struct ctdb_req_call
);
1274 CTDB_NO_MEMORY_NULL(ctdb
, state
->c
);
1275 state
->c
->hdr
.destnode
= header
->dmaster
;
1277 /* this limits us to 16k outstanding messages - not unreasonable */
1278 state
->c
->hdr
.reqid
= state
->reqid
;
1279 state
->c
->flags
= call
->flags
;
1280 state
->c
->db_id
= ctdb_db
->db_id
;
1281 state
->c
->callid
= call
->call_id
;
1282 state
->c
->hopcount
= 0;
1283 state
->c
->keylen
= call
->key
.dsize
;
1284 state
->c
->calldatalen
= call
->call_data
.dsize
;
1285 memcpy(&state
->c
->data
[0], call
->key
.dptr
, call
->key
.dsize
);
1286 memcpy(&state
->c
->data
[call
->key
.dsize
],
1287 call
->call_data
.dptr
, call
->call_data
.dsize
);
1288 *(state
->call
) = *call
;
1289 state
->call
->call_data
.dptr
= &state
->c
->data
[call
->key
.dsize
];
1290 state
->call
->key
.dptr
= &state
->c
->data
[0];
1292 state
->state
= CTDB_CALL_WAIT
;
1293 state
->generation
= ctdb
->vnn_map
->generation
;
1295 DLIST_ADD(ctdb
->pending_calls
, state
);
1297 ctdb_queue_packet(ctdb
, &state
->c
->hdr
);
1303 make a remote ctdb call - async recv - called in daemon context
1305 This is called when the program wants to wait for a ctdb_call to complete and get the
1306 results. This call will block unless the call has already completed.
1308 int ctdb_daemon_call_recv(struct ctdb_call_state
*state
, struct ctdb_call
*call
)
1310 while (state
->state
< CTDB_CALL_DONE
) {
1311 event_loop_once(state
->ctdb_db
->ctdb
->ev
);
1313 if (state
->state
!= CTDB_CALL_DONE
) {
1314 ctdb_set_error(state
->ctdb_db
->ctdb
, "%s", state
->errmsg
);
1319 if (state
->call
->reply_data
.dsize
) {
1320 call
->reply_data
.dptr
= talloc_memdup(call
,
1321 state
->call
->reply_data
.dptr
,
1322 state
->call
->reply_data
.dsize
);
1323 call
->reply_data
.dsize
= state
->call
->reply_data
.dsize
;
1325 call
->reply_data
.dptr
= NULL
;
1326 call
->reply_data
.dsize
= 0;
1328 call
->status
= state
->call
->status
;
1335 send a keepalive packet to the other node
1337 void ctdb_send_keepalive(struct ctdb_context
*ctdb
, uint32_t destnode
)
1339 struct ctdb_req_keepalive
*r
;
1341 if (ctdb
->methods
== NULL
) {
1342 DEBUG(DEBUG_INFO
,(__location__
" Failed to send keepalive. Transport is DOWN\n"));
1346 r
= ctdb_transport_allocate(ctdb
, ctdb
, CTDB_REQ_KEEPALIVE
,
1347 sizeof(struct ctdb_req_keepalive
),
1348 struct ctdb_req_keepalive
);
1349 CTDB_NO_MEMORY_FATAL(ctdb
, r
);
1350 r
->hdr
.destnode
= destnode
;
1353 CTDB_INCREMENT_STAT(ctdb
, keepalive_packets_sent
);
1355 ctdb_queue_packet(ctdb
, &r
->hdr
);
1362 struct revokechild_deferred_call
{
1363 struct ctdb_context
*ctdb
;
1364 struct ctdb_req_header
*hdr
;
1365 deferred_requeue_fn fn
;
1369 struct revokechild_handle
{
1370 struct revokechild_handle
*next
, *prev
;
1371 struct ctdb_context
*ctdb
;
1372 struct ctdb_db_context
*ctdb_db
;
1373 struct fd_event
*fde
;
1380 struct revokechild_requeue_handle
{
1381 struct ctdb_context
*ctdb
;
1382 struct ctdb_req_header
*hdr
;
1383 deferred_requeue_fn fn
;
1387 static void deferred_call_requeue(struct event_context
*ev
, struct timed_event
*te
,
1388 struct timeval t
, void *private_data
)
1390 struct revokechild_requeue_handle
*requeue_handle
= talloc_get_type(private_data
, struct revokechild_requeue_handle
);
1392 requeue_handle
->fn(requeue_handle
->ctx
, requeue_handle
->hdr
);
1393 talloc_free(requeue_handle
);
1396 static int deferred_call_destructor(struct revokechild_deferred_call
*deferred_call
)
1398 struct ctdb_context
*ctdb
= deferred_call
->ctdb
;
1399 struct revokechild_requeue_handle
*requeue_handle
= talloc(ctdb
, struct revokechild_requeue_handle
);
1400 struct ctdb_req_call
*c
= (struct ctdb_req_call
*)deferred_call
->hdr
;
1402 requeue_handle
->ctdb
= ctdb
;
1403 requeue_handle
->hdr
= deferred_call
->hdr
;
1404 requeue_handle
->fn
= deferred_call
->fn
;
1405 requeue_handle
->ctx
= deferred_call
->ctx
;
1406 talloc_steal(requeue_handle
, requeue_handle
->hdr
);
1408 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1409 event_add_timed(ctdb
->ev
, requeue_handle
, timeval_current_ofs(c
->flags
& CTDB_WANT_READONLY
? 1 : 0, 0), deferred_call_requeue
, requeue_handle
);
1415 static int revokechild_destructor(struct revokechild_handle
*rc
)
1417 if (rc
->fde
!= NULL
) {
1418 talloc_free(rc
->fde
);
1421 if (rc
->fd
[0] != -1) {
1424 if (rc
->fd
[1] != -1) {
1427 ctdb_kill(rc
->ctdb
, rc
->child
, SIGKILL
);
1429 DLIST_REMOVE(rc
->ctdb_db
->revokechild_active
, rc
);
1433 static void revokechild_handler(struct event_context
*ev
, struct fd_event
*fde
,
1434 uint16_t flags
, void *private_data
)
1436 struct revokechild_handle
*rc
= talloc_get_type(private_data
,
1437 struct revokechild_handle
);
1441 ret
= read(rc
->fd
[0], &c
, 1);
1443 DEBUG(DEBUG_ERR
,("Failed to read status from revokechild. errno:%d\n", errno
));
1449 DEBUG(DEBUG_ERR
,("revokechild returned failure. status:%d\n", c
));
1458 struct ctdb_revoke_state
{
1459 struct ctdb_db_context
*ctdb_db
;
1461 struct ctdb_ltdb_header
*header
;
1468 static void update_record_cb(struct ctdb_client_control_state
*state
)
1470 struct ctdb_revoke_state
*revoke_state
;
1474 if (state
== NULL
) {
1477 revoke_state
= state
->async
.private_data
;
1479 state
->async
.fn
= NULL
;
1480 ret
= ctdb_control_recv(state
->ctdb
, state
, state
, NULL
, &res
, NULL
);
1481 if ((ret
!= 0) || (res
!= 0)) {
1482 DEBUG(DEBUG_ERR
,("Recv for revoke update record failed ret:%d res:%d\n", ret
, res
));
1483 revoke_state
->status
= -1;
1486 revoke_state
->count
--;
1487 if (revoke_state
->count
<= 0) {
1488 revoke_state
->finished
= 1;
1492 static void revoke_send_cb(struct ctdb_context
*ctdb
, uint32_t pnn
, void *private_data
)
1494 struct ctdb_revoke_state
*revoke_state
= private_data
;
1495 struct ctdb_client_control_state
*state
;
1497 state
= ctdb_ctrl_updaterecord_send(ctdb
, revoke_state
, timeval_current_ofs(5,0), pnn
, revoke_state
->ctdb_db
, revoke_state
->key
, revoke_state
->header
, revoke_state
->data
);
1498 if (state
== NULL
) {
1499 DEBUG(DEBUG_ERR
,("Failure to send update record to revoke readonly delegation\n"));
1500 revoke_state
->status
= -1;
1503 state
->async
.fn
= update_record_cb
;
1504 state
->async
.private_data
= revoke_state
;
1506 revoke_state
->count
++;
1510 static void ctdb_revoke_timeout_handler(struct event_context
*ev
, struct timed_event
*te
,
1511 struct timeval yt
, void *private_data
)
1513 struct ctdb_revoke_state
*state
= private_data
;
1515 DEBUG(DEBUG_ERR
,("Timed out waiting for revoke to finish\n"));
1516 state
->finished
= 1;
1520 static int ctdb_revoke_all_delegations(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA tdata
, TDB_DATA key
, struct ctdb_ltdb_header
*header
, TDB_DATA data
)
1522 struct ctdb_revoke_state
*state
= talloc_zero(ctdb
, struct ctdb_revoke_state
);
1525 state
->ctdb_db
= ctdb_db
;
1527 state
->header
= header
;
1530 ctdb_trackingdb_traverse(ctdb
, tdata
, revoke_send_cb
, state
);
1532 event_add_timed(ctdb
->ev
, state
, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler
, state
);
1534 while (state
->finished
== 0) {
1535 event_loop_once(ctdb
->ev
);
1538 status
= state
->status
;
1541 struct ctdb_ltdb_header new_header
;
1544 if (ctdb_ltdb_lock(ctdb_db
, key
) != 0) {
1545 DEBUG(DEBUG_ERR
,("Failed to chainlock the database in revokechild\n"));
1549 if (ctdb_ltdb_fetch(ctdb_db
, key
, &new_header
, state
, &new_data
) != 0) {
1550 ctdb_ltdb_unlock(ctdb_db
, key
);
1551 DEBUG(DEBUG_ERR
,("Failed for fetch tdb record in revokechild\n"));
1556 if (new_header
.rsn
> header
->rsn
) {
1557 ctdb_ltdb_unlock(ctdb_db
, key
);
1558 DEBUG(DEBUG_ERR
,("RSN too high in tdb record in revokechild\n"));
1562 if ( (new_header
.flags
& (CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
)) != (CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
) ) {
1563 ctdb_ltdb_unlock(ctdb_db
, key
);
1564 DEBUG(DEBUG_ERR
,("Flags are wrong in tdb record in revokechild\n"));
1569 new_header
.flags
|= CTDB_REC_RO_REVOKE_COMPLETE
;
1570 if (ctdb_ltdb_store(ctdb_db
, key
, &new_header
, new_data
) != 0) {
1571 ctdb_ltdb_unlock(ctdb_db
, key
);
1572 DEBUG(DEBUG_ERR
,("Failed to write new record in revokechild\n"));
1576 ctdb_ltdb_unlock(ctdb_db
, key
);
1584 int ctdb_start_revoke_ro_record(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_ltdb_header
*header
, TDB_DATA data
)
1587 struct revokechild_handle
*rc
;
1588 pid_t parent
= getpid();
1591 header
->flags
&= ~(CTDB_REC_RO_REVOKING_READONLY
|CTDB_REC_RO_HAVE_DELEGATIONS
|CTDB_REC_RO_HAVE_READONLY
);
1592 header
->flags
|= CTDB_REC_FLAG_MIGRATED_WITH_DATA
;
1595 if ((rc
= talloc_zero(ctdb_db
, struct revokechild_handle
)) == NULL
) {
1596 DEBUG(DEBUG_ERR
,("Failed to allocate revokechild_handle\n"));
1600 tdata
= tdb_fetch(ctdb_db
->rottdb
, key
);
1601 if (tdata
.dsize
> 0) {
1605 tdata
.dptr
= talloc_memdup(rc
, tdata
.dptr
, tdata
.dsize
);
1611 rc
->ctdb_db
= ctdb_db
;
1615 talloc_set_destructor(rc
, revokechild_destructor
);
1617 rc
->key
.dsize
= key
.dsize
;
1618 rc
->key
.dptr
= talloc_memdup(rc
, key
.dptr
, key
.dsize
);
1619 if (rc
->key
.dptr
== NULL
) {
1620 DEBUG(DEBUG_ERR
,("Failed to allocate key for revokechild_handle\n"));
1627 DEBUG(DEBUG_ERR
,("Failed to allocate key for revokechild_handle\n"));
1633 rc
->child
= ctdb_fork(ctdb
);
1634 if (rc
->child
== (pid_t
)-1) {
1635 DEBUG(DEBUG_ERR
,("Failed to fork child for revokechild\n"));
1640 if (rc
->child
== 0) {
1643 debug_extra
= talloc_asprintf(NULL
, "revokechild-%s:", ctdb_db
->db_name
);
1645 if (switch_from_server_to_client(ctdb
, "revokechild-%s", ctdb_db
->db_name
) != 0) {
1646 DEBUG(DEBUG_ERR
,("Failed to switch from server to client for revokechild process\n"));
1648 goto child_finished
;
1651 c
= ctdb_revoke_all_delegations(ctdb
, ctdb_db
, tdata
, key
, header
, data
);
1654 write(rc
->fd
[1], &c
, 1);
1655 /* make sure we die when our parent dies */
1656 while (ctdb_kill(ctdb
, parent
, 0) == 0 || errno
!= ESRCH
) {
1664 set_close_on_exec(rc
->fd
[0]);
1666 /* This is an active revokechild child process */
1667 DLIST_ADD_END(ctdb_db
->revokechild_active
, rc
, NULL
);
1669 rc
->fde
= event_add_fd(ctdb
->ev
, rc
, rc
->fd
[0],
1670 EVENT_FD_READ
, revokechild_handler
,
1672 if (rc
->fde
== NULL
) {
1673 DEBUG(DEBUG_ERR
,("Failed to set up fd event for revokechild process\n"));
1676 tevent_fd_set_auto_close(rc
->fde
);
1681 int ctdb_add_revoke_deferred_call(struct ctdb_context
*ctdb
, struct ctdb_db_context
*ctdb_db
, TDB_DATA key
, struct ctdb_req_header
*hdr
, deferred_requeue_fn fn
, void *call_context
)
1683 struct revokechild_handle
*rc
;
1684 struct revokechild_deferred_call
*deferred_call
;
1686 for (rc
= ctdb_db
->revokechild_active
; rc
; rc
= rc
->next
) {
1687 if (rc
->key
.dsize
== 0) {
1690 if (rc
->key
.dsize
!= key
.dsize
) {
1693 if (!memcmp(rc
->key
.dptr
, key
.dptr
, key
.dsize
)) {
1699 DEBUG(DEBUG_ERR
,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1703 deferred_call
= talloc(rc
, struct revokechild_deferred_call
);
1704 if (deferred_call
== NULL
) {
1705 DEBUG(DEBUG_ERR
,("Failed to allocate deferred call structure for revoking record\n"));
1709 deferred_call
->ctdb
= ctdb
;
1710 deferred_call
->hdr
= hdr
;
1711 deferred_call
->fn
= fn
;
1712 deferred_call
->ctx
= call_context
;
1714 talloc_set_destructor(deferred_call
, deferred_call_destructor
);
1715 talloc_steal(deferred_call
, hdr
);