ctdb-daemon: Fix CID 1364527/8/9: Null pointer dereferences (NULL_RETURNS)
[Samba.git] / ctdb / server / ctdb_call.c
blob3478419fd4c0f2abbf352afa418f583cc7dd6fc1
1 /*
2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/util_process.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
38 #include "common/rb_tree.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
44 struct ctdb_sticky_record {
45 struct ctdb_context *ctdb;
46 struct ctdb_db_context *ctdb_db;
47 TDB_CONTEXT *pindown;
51 find the ctdb_db from a db index
53 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
55 struct ctdb_db_context *ctdb_db;
57 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
58 if (ctdb_db->db_id == id) {
59 break;
62 return ctdb_db;
66 a varient of input packet that can be used in lock requeue
68 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
70 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
71 ctdb_input_pkt(ctdb, hdr);
76 send an error reply
78 static void ctdb_send_error(struct ctdb_context *ctdb,
79 struct ctdb_req_header *hdr, uint32_t status,
80 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
81 static void ctdb_send_error(struct ctdb_context *ctdb,
82 struct ctdb_req_header *hdr, uint32_t status,
83 const char *fmt, ...)
85 va_list ap;
86 struct ctdb_reply_error_old *r;
87 char *msg;
88 int msglen, len;
90 if (ctdb->methods == NULL) {
91 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
92 return;
95 va_start(ap, fmt);
96 msg = talloc_vasprintf(ctdb, fmt, ap);
97 if (msg == NULL) {
98 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
100 va_end(ap);
102 msglen = strlen(msg)+1;
103 len = offsetof(struct ctdb_reply_error_old, msg);
104 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
105 struct ctdb_reply_error_old);
106 CTDB_NO_MEMORY_FATAL(ctdb, r);
108 r->hdr.destnode = hdr->srcnode;
109 r->hdr.reqid = hdr->reqid;
110 r->status = status;
111 r->msglen = msglen;
112 memcpy(&r->msg[0], msg, msglen);
114 ctdb_queue_packet(ctdb, &r->hdr);
116 talloc_free(msg);
121 * send a redirect reply
123 * The logic behind this function is this:
125 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
126 * to its local ctdb (ctdb_request_call). If the node is not itself
127 * the record's DMASTER, it first redirects the packet to the
128 * record's LMASTER. The LMASTER then redirects the call packet to
129 * the current DMASTER. Note that this works because of this: When
130 * a record is migrated off a node, then the new DMASTER is stored
131 * in the record's copy on the former DMASTER.
133 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
134 struct ctdb_db_context *ctdb_db,
135 TDB_DATA key,
136 struct ctdb_req_call_old *c,
137 struct ctdb_ltdb_header *header)
139 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
141 c->hdr.destnode = lmaster;
142 if (ctdb->pnn == lmaster) {
143 c->hdr.destnode = header->dmaster;
145 c->hopcount++;
147 if (c->hopcount%100 > 95) {
148 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
149 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
150 "header->dmaster:%d dst:%d\n",
151 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
152 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
153 header->dmaster, c->hdr.destnode));
156 ctdb_queue_packet(ctdb, &c->hdr);
161 send a dmaster reply
163 caller must have the chainlock before calling this routine. Caller must be
164 the lmaster
166 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
167 struct ctdb_ltdb_header *header,
168 TDB_DATA key, TDB_DATA data,
169 uint32_t new_dmaster,
170 uint32_t reqid)
172 struct ctdb_context *ctdb = ctdb_db->ctdb;
173 struct ctdb_reply_dmaster_old *r;
174 int ret, len;
175 TALLOC_CTX *tmp_ctx;
177 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
178 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
179 return;
182 header->dmaster = new_dmaster;
183 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
184 if (ret != 0) {
185 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
186 return;
189 if (ctdb->methods == NULL) {
190 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
191 return;
194 /* put the packet on a temporary context, allowing us to safely free
195 it below even if ctdb_reply_dmaster() has freed it already */
196 tmp_ctx = talloc_new(ctdb);
198 /* send the CTDB_REPLY_DMASTER */
199 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
200 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
201 struct ctdb_reply_dmaster_old);
202 CTDB_NO_MEMORY_FATAL(ctdb, r);
204 r->hdr.destnode = new_dmaster;
205 r->hdr.reqid = reqid;
206 r->hdr.generation = ctdb_db->generation;
207 r->rsn = header->rsn;
208 r->keylen = key.dsize;
209 r->datalen = data.dsize;
210 r->db_id = ctdb_db->db_id;
211 memcpy(&r->data[0], key.dptr, key.dsize);
212 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
213 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
215 ctdb_queue_packet(ctdb, &r->hdr);
217 talloc_free(tmp_ctx);
221 send a dmaster request (give another node the dmaster for a record)
223 This is always sent to the lmaster, which ensures that the lmaster
224 always knows who the dmaster is. The lmaster will then send a
225 CTDB_REPLY_DMASTER to the new dmaster
227 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
228 struct ctdb_req_call_old *c,
229 struct ctdb_ltdb_header *header,
230 TDB_DATA *key, TDB_DATA *data)
232 struct ctdb_req_dmaster_old *r;
233 struct ctdb_context *ctdb = ctdb_db->ctdb;
234 int len;
235 uint32_t lmaster = ctdb_lmaster(ctdb, key);
237 if (ctdb->methods == NULL) {
238 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
239 return;
242 if (data->dsize != 0) {
243 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
246 if (lmaster == ctdb->pnn) {
247 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
248 c->hdr.srcnode, c->hdr.reqid);
249 return;
252 len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
253 + sizeof(uint32_t);
254 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
255 struct ctdb_req_dmaster_old);
256 CTDB_NO_MEMORY_FATAL(ctdb, r);
257 r->hdr.destnode = lmaster;
258 r->hdr.reqid = c->hdr.reqid;
259 r->hdr.generation = ctdb_db->generation;
260 r->db_id = c->db_id;
261 r->rsn = header->rsn;
262 r->dmaster = c->hdr.srcnode;
263 r->keylen = key->dsize;
264 r->datalen = data->dsize;
265 memcpy(&r->data[0], key->dptr, key->dsize);
266 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
267 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
269 header->dmaster = c->hdr.srcnode;
270 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
271 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
274 ctdb_queue_packet(ctdb, &r->hdr);
276 talloc_free(r);
279 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
280 struct tevent_timer *te,
281 struct timeval t, void *private_data)
283 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
284 struct ctdb_sticky_record);
286 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
287 if (sr->pindown != NULL) {
288 talloc_free(sr->pindown);
289 sr->pindown = NULL;
293 static int
294 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
296 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
297 uint32_t *k;
298 struct ctdb_sticky_record *sr;
300 k = ctdb_key_to_idkey(tmp_ctx, key);
301 if (k == NULL) {
302 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
303 talloc_free(tmp_ctx);
304 return -1;
307 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
308 if (sr == NULL) {
309 talloc_free(tmp_ctx);
310 return 0;
313 talloc_free(tmp_ctx);
315 if (sr->pindown == NULL) {
316 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
317 sr->pindown = talloc_new(sr);
318 if (sr->pindown == NULL) {
319 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
320 return -1;
322 tevent_add_timer(ctdb->ev, sr->pindown,
323 timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
324 (ctdb->tunable.sticky_pindown * 1000) % 1000000),
325 ctdb_sticky_pindown_timeout, sr);
328 return 0;
332 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
333 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
335 must be called with the chainlock held. This function releases the chainlock
337 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
338 struct ctdb_req_header *hdr,
339 TDB_DATA key, TDB_DATA data,
340 uint64_t rsn, uint32_t record_flags)
342 struct ctdb_call_state *state;
343 struct ctdb_context *ctdb = ctdb_db->ctdb;
344 struct ctdb_ltdb_header header;
345 int ret;
347 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
349 ZERO_STRUCT(header);
350 header.rsn = rsn;
351 header.dmaster = ctdb->pnn;
352 header.flags = record_flags;
354 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
356 if (state) {
357 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
359 * We temporarily add the VACUUM_MIGRATED flag to
360 * the record flags, so that ctdb_ltdb_store can
361 * decide whether the record should be stored or
362 * deleted.
364 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
368 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
369 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
371 ret = ctdb_ltdb_unlock(ctdb_db, key);
372 if (ret != 0) {
373 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
375 return;
378 /* we just became DMASTER and this database is "sticky",
379 see if the record is flagged as "hot" and set up a pin-down
380 context to stop migrations for a little while if so
382 if (ctdb_db->sticky) {
383 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
386 if (state == NULL) {
387 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
388 ctdb->pnn, hdr->reqid, hdr->srcnode));
390 ret = ctdb_ltdb_unlock(ctdb_db, key);
391 if (ret != 0) {
392 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
394 return;
397 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
398 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
400 ret = ctdb_ltdb_unlock(ctdb_db, key);
401 if (ret != 0) {
402 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
404 return;
407 if (hdr->reqid != state->reqid) {
408 /* we found a record but it was the wrong one */
409 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
411 ret = ctdb_ltdb_unlock(ctdb_db, key);
412 if (ret != 0) {
413 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
415 return;
418 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
420 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
421 if (ret != 0) {
422 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
425 state->state = CTDB_CALL_DONE;
426 if (state->async.fn) {
427 state->async.fn(state);
431 struct dmaster_defer_call {
432 struct dmaster_defer_call *next, *prev;
433 struct ctdb_context *ctdb;
434 struct ctdb_req_header *hdr;
437 struct dmaster_defer_queue {
438 struct ctdb_db_context *ctdb_db;
439 uint32_t generation;
440 struct dmaster_defer_call *deferred_calls;
443 static void dmaster_defer_reprocess(struct tevent_context *ev,
444 struct tevent_timer *te,
445 struct timeval t,
446 void *private_data)
448 struct dmaster_defer_call *call = talloc_get_type(
449 private_data, struct dmaster_defer_call);
451 ctdb_input_pkt(call->ctdb, call->hdr);
452 talloc_free(call);
455 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
457 /* Ignore requests, if database recovery happens in-between. */
458 if (ddq->generation != ddq->ctdb_db->generation) {
459 return 0;
462 while (ddq->deferred_calls != NULL) {
463 struct dmaster_defer_call *call = ddq->deferred_calls;
465 DLIST_REMOVE(ddq->deferred_calls, call);
467 talloc_steal(call->ctdb, call);
468 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
469 dmaster_defer_reprocess, call);
471 return 0;
474 static void *insert_ddq_callback(void *parm, void *data)
476 if (data) {
477 talloc_free(data);
479 return parm;
483 * This function is used to reigster a key in database that needs to be updated.
484 * Any requests for that key should get deferred till this is completed.
486 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
487 struct ctdb_req_header *hdr,
488 TDB_DATA key)
490 uint32_t *k;
491 struct dmaster_defer_queue *ddq;
493 k = ctdb_key_to_idkey(hdr, key);
494 if (k == NULL) {
495 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
496 return -1;
499 /* Already exists */
500 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
501 if (ddq != NULL) {
502 if (ddq->generation == ctdb_db->generation) {
503 talloc_free(k);
504 return 0;
507 /* Recovery ocurred - get rid of old queue. All the deferred
508 * requests will be resent anyway from ctdb_call_resend_db.
510 talloc_free(ddq);
513 ddq = talloc(hdr, struct dmaster_defer_queue);
514 if (ddq == NULL) {
515 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
516 talloc_free(k);
517 return -1;
519 ddq->ctdb_db = ctdb_db;
520 ddq->generation = hdr->generation;
521 ddq->deferred_calls = NULL;
523 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
524 insert_ddq_callback, ddq);
525 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
527 talloc_free(k);
528 return 0;
531 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
532 struct ctdb_req_header *hdr,
533 TDB_DATA key)
535 struct dmaster_defer_queue *ddq;
536 struct dmaster_defer_call *call;
537 uint32_t *k;
539 k = ctdb_key_to_idkey(hdr, key);
540 if (k == NULL) {
541 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
542 return -1;
545 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
546 if (ddq == NULL) {
547 talloc_free(k);
548 return -1;
551 talloc_free(k);
553 if (ddq->generation != hdr->generation) {
554 talloc_set_destructor(ddq, NULL);
555 talloc_free(ddq);
556 return -1;
559 call = talloc(ddq, struct dmaster_defer_call);
560 if (call == NULL) {
561 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
562 return -1;
565 call->ctdb = ctdb_db->ctdb;
566 call->hdr = talloc_steal(call, hdr);
568 DLIST_ADD_END(ddq->deferred_calls, call);
570 return 0;
574 called when a CTDB_REQ_DMASTER packet comes in
576 this comes into the lmaster for a record when the current dmaster
577 wants to give up the dmaster role and give it to someone else
579 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
581 struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
582 TDB_DATA key, data, data2;
583 struct ctdb_ltdb_header header;
584 struct ctdb_db_context *ctdb_db;
585 uint32_t record_flags = 0;
586 size_t len;
587 int ret;
589 key.dptr = c->data;
590 key.dsize = c->keylen;
591 data.dptr = c->data + c->keylen;
592 data.dsize = c->datalen;
593 len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
594 + sizeof(uint32_t);
595 if (len <= c->hdr.length) {
596 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
597 sizeof(record_flags));
600 ctdb_db = find_ctdb_db(ctdb, c->db_id);
601 if (!ctdb_db) {
602 ctdb_send_error(ctdb, hdr, -1,
603 "Unknown database in request. db_id==0x%08x",
604 c->db_id);
605 return;
608 dmaster_defer_setup(ctdb_db, hdr, key);
610 /* fetch the current record */
611 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
612 ctdb_call_input_pkt, ctdb, false);
613 if (ret == -1) {
614 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
615 return;
617 if (ret == -2) {
618 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
619 return;
622 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
623 DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
624 "db=%s lmaster=%u gen=%u curgen=%u\n",
625 ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
626 hdr->generation, ctdb_db->generation));
627 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
630 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
631 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
633 /* its a protocol error if the sending node is not the current dmaster */
634 if (header.dmaster != hdr->srcnode) {
635 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
636 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
637 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
638 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
639 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
640 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
641 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
643 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
644 ctdb_ltdb_unlock(ctdb_db, key);
645 return;
649 if (header.rsn > c->rsn) {
650 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
651 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
652 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
653 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
656 /* use the rsn from the sending node */
657 header.rsn = c->rsn;
659 /* store the record flags from the sending node */
660 header.flags = record_flags;
662 /* check if the new dmaster is the lmaster, in which case we
663 skip the dmaster reply */
664 if (c->dmaster == ctdb->pnn) {
665 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
666 } else {
667 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
669 ret = ctdb_ltdb_unlock(ctdb_db, key);
670 if (ret != 0) {
671 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
676 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
677 struct tevent_timer *te,
678 struct timeval t, void *private_data)
680 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
681 struct ctdb_sticky_record);
682 talloc_free(sr);
685 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
687 if (data) {
688 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
689 talloc_free(data);
691 return parm;
694 static int
695 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
697 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
698 uint32_t *k;
699 struct ctdb_sticky_record *sr;
701 k = ctdb_key_to_idkey(tmp_ctx, key);
702 if (k == NULL) {
703 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
704 talloc_free(tmp_ctx);
705 return -1;
708 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
709 if (sr != NULL) {
710 talloc_free(tmp_ctx);
711 return 0;
714 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
715 if (sr == NULL) {
716 talloc_free(tmp_ctx);
717 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
718 return -1;
721 sr->ctdb = ctdb;
722 sr->ctdb_db = ctdb_db;
723 sr->pindown = NULL;
725 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
726 ctdb->tunable.sticky_duration,
727 ctdb_db->db_name, ctdb_hash(&key)));
729 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
731 tevent_add_timer(ctdb->ev, sr,
732 timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
733 ctdb_sticky_record_timeout, sr);
735 talloc_free(tmp_ctx);
736 return 0;
739 struct pinned_down_requeue_handle {
740 struct ctdb_context *ctdb;
741 struct ctdb_req_header *hdr;
744 struct pinned_down_deferred_call {
745 struct ctdb_context *ctdb;
746 struct ctdb_req_header *hdr;
749 static void pinned_down_requeue(struct tevent_context *ev,
750 struct tevent_timer *te,
751 struct timeval t, void *private_data)
753 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
754 struct ctdb_context *ctdb = handle->ctdb;
756 talloc_steal(ctdb, handle->hdr);
757 ctdb_call_input_pkt(ctdb, handle->hdr);
759 talloc_free(handle);
762 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
764 struct ctdb_context *ctdb = pinned_down->ctdb;
765 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
767 handle->ctdb = pinned_down->ctdb;
768 handle->hdr = pinned_down->hdr;
769 talloc_steal(handle, handle->hdr);
771 tevent_add_timer(ctdb->ev, handle, timeval_zero(),
772 pinned_down_requeue, handle);
774 return 0;
777 static int
778 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
780 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
781 uint32_t *k;
782 struct ctdb_sticky_record *sr;
783 struct pinned_down_deferred_call *pinned_down;
785 k = ctdb_key_to_idkey(tmp_ctx, key);
786 if (k == NULL) {
787 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
788 talloc_free(tmp_ctx);
789 return -1;
792 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
793 if (sr == NULL) {
794 talloc_free(tmp_ctx);
795 return -1;
798 talloc_free(tmp_ctx);
800 if (sr->pindown == NULL) {
801 return -1;
804 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
805 if (pinned_down == NULL) {
806 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
807 return -1;
810 pinned_down->ctdb = ctdb;
811 pinned_down->hdr = hdr;
813 talloc_set_destructor(pinned_down, pinned_down_destructor);
814 talloc_steal(pinned_down, hdr);
816 return 0;
819 static void
820 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
822 int i, id;
824 /* smallest value is always at index 0 */
825 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
826 return;
829 /* see if we already know this key */
830 for (i = 0; i < MAX_HOT_KEYS; i++) {
831 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
832 continue;
834 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
835 continue;
837 /* found an entry for this key */
838 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
839 return;
841 ctdb_db->statistics.hot_keys[i].count = hopcount;
842 goto sort_keys;
845 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
846 id = ctdb_db->statistics.num_hot_keys;
847 ctdb_db->statistics.num_hot_keys++;
848 } else {
849 id = 0;
852 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
853 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
855 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
856 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
857 ctdb_db->statistics.hot_keys[id].count = hopcount;
858 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
859 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
861 sort_keys:
862 for (i = 1; i < MAX_HOT_KEYS; i++) {
863 if (ctdb_db->statistics.hot_keys[i].count == 0) {
864 continue;
866 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
867 hopcount = ctdb_db->statistics.hot_keys[i].count;
868 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
869 ctdb_db->statistics.hot_keys[0].count = hopcount;
871 key = ctdb_db->statistics.hot_keys[i].key;
872 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
873 ctdb_db->statistics.hot_keys[0].key = key;
879 called when a CTDB_REQ_CALL packet comes in
881 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
883 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
884 TDB_DATA data;
885 struct ctdb_reply_call_old *r;
886 int ret, len;
887 struct ctdb_ltdb_header header;
888 struct ctdb_call *call;
889 struct ctdb_db_context *ctdb_db;
890 int tmp_count, bucket;
892 if (ctdb->methods == NULL) {
893 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
894 return;
898 ctdb_db = find_ctdb_db(ctdb, c->db_id);
899 if (!ctdb_db) {
900 ctdb_send_error(ctdb, hdr, -1,
901 "Unknown database in request. db_id==0x%08x",
902 c->db_id);
903 return;
906 call = talloc(hdr, struct ctdb_call);
907 CTDB_NO_MEMORY_FATAL(ctdb, call);
909 call->call_id = c->callid;
910 call->key.dptr = c->data;
911 call->key.dsize = c->keylen;
912 call->call_data.dptr = c->data + c->keylen;
913 call->call_data.dsize = c->calldatalen;
914 call->reply_data.dptr = NULL;
915 call->reply_data.dsize = 0;
918 /* If this record is pinned down we should defer the
919 request until the pindown times out
921 if (ctdb_db->sticky) {
922 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
923 DEBUG(DEBUG_WARNING,
924 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
925 talloc_free(call);
926 return;
930 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
931 talloc_free(call);
932 return;
935 /* determine if we are the dmaster for this key. This also
936 fetches the record data (if any), thus avoiding a 2nd fetch of the data
937 if the call will be answered locally */
939 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
940 ctdb_call_input_pkt, ctdb, false);
941 if (ret == -1) {
942 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
943 talloc_free(call);
944 return;
946 if (ret == -2) {
947 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
948 talloc_free(call);
949 return;
952 /* Dont do READONLY if we don't have a tracking database */
953 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
954 c->flags &= ~CTDB_WANT_READONLY;
957 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
958 header.flags &= ~CTDB_REC_RO_FLAGS;
959 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
960 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
961 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
962 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
964 /* and clear out the tracking data */
965 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
966 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
970 /* if we are revoking, we must defer all other calls until the revoke
971 * had completed.
973 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
974 talloc_free(data.dptr);
975 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
977 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
978 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
980 talloc_free(call);
981 return;
985 * If we are not the dmaster and are not hosting any delegations,
986 * then we redirect the request to the node than can answer it
987 * (the lmaster or the dmaster).
989 if ((header.dmaster != ctdb->pnn)
990 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
991 talloc_free(data.dptr);
992 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
994 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
995 if (ret != 0) {
996 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
998 talloc_free(call);
999 return;
1002 if ( (!(c->flags & CTDB_WANT_READONLY))
1003 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1004 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1005 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1006 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1008 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1010 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1011 ctdb_fatal(ctdb, "Failed to start record revoke");
1013 talloc_free(data.dptr);
1015 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1016 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1018 talloc_free(call);
1020 return;
1023 /* If this is the first request for delegation. bump rsn and set
1024 * the delegations flag
1026 if ((c->flags & CTDB_WANT_READONLY)
1027 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1028 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1029 header.rsn += 3;
1030 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1031 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1032 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1035 if ((c->flags & CTDB_WANT_READONLY)
1036 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1037 TDB_DATA tdata;
1039 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1040 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1041 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1043 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1044 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1046 free(tdata.dptr);
1048 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1049 if (ret != 0) {
1050 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1053 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1054 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1055 struct ctdb_reply_call_old);
1056 CTDB_NO_MEMORY_FATAL(ctdb, r);
1057 r->hdr.destnode = c->hdr.srcnode;
1058 r->hdr.reqid = c->hdr.reqid;
1059 r->hdr.generation = ctdb_db->generation;
1060 r->status = 0;
1061 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1062 header.rsn -= 2;
1063 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1064 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1065 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1067 if (data.dsize) {
1068 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1071 ctdb_queue_packet(ctdb, &r->hdr);
1072 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1073 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1075 talloc_free(r);
1076 talloc_free(call);
1077 return;
1080 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1081 tmp_count = c->hopcount;
1082 bucket = 0;
1083 while (tmp_count) {
1084 tmp_count >>= 2;
1085 bucket++;
1087 if (bucket >= MAX_COUNT_BUCKETS) {
1088 bucket = MAX_COUNT_BUCKETS - 1;
1090 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1091 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1092 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1094 /* If this database supports sticky records, then check if the
1095 hopcount is big. If it is it means the record is hot and we
1096 should make it sticky.
1098 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1099 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1103 /* Try if possible to migrate the record off to the caller node.
1104 * From the clients perspective a fetch of the data is just as
1105 * expensive as a migration.
1107 if (c->hdr.srcnode != ctdb->pnn) {
1108 if (ctdb_db->persistent_state) {
1109 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1110 " of key %s while transaction is active\n",
1111 (char *)call->key.dptr));
1112 } else {
1113 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1114 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1115 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1116 talloc_free(data.dptr);
1118 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1119 if (ret != 0) {
1120 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1123 talloc_free(call);
1124 return;
1127 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1128 if (ret != 0) {
1129 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1130 call->status = -1;
1133 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1134 if (ret != 0) {
1135 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1138 len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1139 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1140 struct ctdb_reply_call_old);
1141 CTDB_NO_MEMORY_FATAL(ctdb, r);
1142 r->hdr.destnode = hdr->srcnode;
1143 r->hdr.reqid = hdr->reqid;
1144 r->hdr.generation = ctdb_db->generation;
1145 r->status = call->status;
1146 r->datalen = call->reply_data.dsize;
1147 if (call->reply_data.dsize) {
1148 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1151 ctdb_queue_packet(ctdb, &r->hdr);
1153 talloc_free(r);
1154 talloc_free(call);
1158 * called when a CTDB_REPLY_CALL packet comes in
1160 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1161 * contains any reply data from the call
1163 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1165 struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1166 struct ctdb_call_state *state;
1168 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1169 if (state == NULL) {
1170 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1171 return;
1174 if (hdr->reqid != state->reqid) {
1175 /* we found a record but it was the wrong one */
1176 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1177 return;
1181 /* read only delegation processing */
1182 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1183 * delegation since we may need to update the record header
1185 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1186 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1187 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1188 struct ctdb_ltdb_header oldheader;
1189 TDB_DATA key, data, olddata;
1190 int ret;
1192 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1193 goto finished_ro;
1194 return;
1197 key.dsize = state->c->keylen;
1198 key.dptr = state->c->data;
1199 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1200 ctdb_call_input_pkt, ctdb, false);
1201 if (ret == -2) {
1202 return;
1204 if (ret != 0) {
1205 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1206 return;
1209 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1210 if (ret != 0) {
1211 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1212 ctdb_ltdb_unlock(ctdb_db, key);
1213 goto finished_ro;
1216 if (header->rsn <= oldheader.rsn) {
1217 ctdb_ltdb_unlock(ctdb_db, key);
1218 goto finished_ro;
1221 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1222 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1223 ctdb_ltdb_unlock(ctdb_db, key);
1224 goto finished_ro;
1227 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1228 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1229 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1230 if (ret != 0) {
1231 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1232 ctdb_ltdb_unlock(ctdb_db, key);
1233 goto finished_ro;
1236 ctdb_ltdb_unlock(ctdb_db, key);
1238 finished_ro:
1240 state->call->reply_data.dptr = c->data;
1241 state->call->reply_data.dsize = c->datalen;
1242 state->call->status = c->status;
1244 talloc_steal(state, c);
1246 state->state = CTDB_CALL_DONE;
1247 if (state->async.fn) {
1248 state->async.fn(state);
1254 * called when a CTDB_REPLY_DMASTER packet comes in
1256 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1257 * request packet. It means that the current dmaster wants to give us
1258 * the dmaster role.
1260 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1262 struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1263 struct ctdb_db_context *ctdb_db;
1264 TDB_DATA key, data;
1265 uint32_t record_flags = 0;
1266 size_t len;
1267 int ret;
1269 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1270 if (ctdb_db == NULL) {
1271 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1272 return;
1275 key.dptr = c->data;
1276 key.dsize = c->keylen;
1277 data.dptr = &c->data[key.dsize];
1278 data.dsize = c->datalen;
1279 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1280 + sizeof(uint32_t);
1281 if (len <= c->hdr.length) {
1282 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1283 sizeof(record_flags));
1286 dmaster_defer_setup(ctdb_db, hdr, key);
1288 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1289 ctdb_call_input_pkt, ctdb, false);
1290 if (ret == -2) {
1291 return;
1293 if (ret != 0) {
1294 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1295 return;
1298 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1303 called when a CTDB_REPLY_ERROR packet comes in
1305 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1307 struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1308 struct ctdb_call_state *state;
1310 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1311 if (state == NULL) {
1312 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1313 ctdb->pnn, hdr->reqid));
1314 return;
1317 if (hdr->reqid != state->reqid) {
1318 /* we found a record but it was the wrong one */
1319 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1320 return;
1323 talloc_steal(state, c);
1325 state->state = CTDB_CALL_ERROR;
1326 state->errmsg = (char *)c->msg;
1327 if (state->async.fn) {
1328 state->async.fn(state);
1334 destroy a ctdb_call
1336 static int ctdb_call_destructor(struct ctdb_call_state *state)
1338 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1339 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1340 return 0;
1345 called when a ctdb_call needs to be resent after a reconfigure event
1347 static void ctdb_call_resend(struct ctdb_call_state *state)
1349 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1351 state->generation = state->ctdb_db->generation;
1353 /* use a new reqid, in case the old reply does eventually come in */
1354 reqid_remove(ctdb->idr, state->reqid);
1355 state->reqid = reqid_new(ctdb->idr, state);
1356 state->c->hdr.reqid = state->reqid;
1358 /* update the generation count for this request, so its valid with the new vnn_map */
1359 state->c->hdr.generation = state->generation;
1361 /* send the packet to ourselves, it will be redirected appropriately */
1362 state->c->hdr.destnode = ctdb->pnn;
1364 ctdb_queue_packet(ctdb, &state->c->hdr);
1365 DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1366 state->ctdb_db->db_name, state->reqid, state->generation));
1370 resend all pending calls on recovery
1372 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1374 struct ctdb_call_state *state, *next;
1376 for (state = ctdb_db->pending_calls; state; state = next) {
1377 next = state->next;
1378 ctdb_call_resend(state);
1382 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1384 struct ctdb_db_context *ctdb_db;
1386 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1387 ctdb_call_resend_db(ctdb_db);
1392 this allows the caller to setup a async.fn
1394 static void call_local_trigger(struct tevent_context *ev,
1395 struct tevent_timer *te,
1396 struct timeval t, void *private_data)
1398 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1399 if (state->async.fn) {
1400 state->async.fn(state);
1406 construct an event driven local ctdb_call
1408 this is used so that locally processed ctdb_call requests are processed
1409 in an event driven manner
1411 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1412 struct ctdb_call *call,
1413 struct ctdb_ltdb_header *header,
1414 TDB_DATA *data)
1416 struct ctdb_call_state *state;
1417 struct ctdb_context *ctdb = ctdb_db->ctdb;
1418 int ret;
1420 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1421 CTDB_NO_MEMORY_NULL(ctdb, state);
1423 talloc_steal(state, data->dptr);
1425 state->state = CTDB_CALL_DONE;
1426 state->call = talloc(state, struct ctdb_call);
1427 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1428 *(state->call) = *call;
1429 state->ctdb_db = ctdb_db;
1431 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1432 if (ret != 0) {
1433 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1436 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1437 call_local_trigger, state);
1439 return state;
1444 make a remote ctdb call - async send. Called in daemon context.
1446 This constructs a ctdb_call request and queues it for processing.
1447 This call never blocks.
1449 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1450 struct ctdb_call *call,
1451 struct ctdb_ltdb_header *header)
1453 uint32_t len;
1454 struct ctdb_call_state *state;
1455 struct ctdb_context *ctdb = ctdb_db->ctdb;
1457 if (ctdb->methods == NULL) {
1458 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1459 return NULL;
1462 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1463 CTDB_NO_MEMORY_NULL(ctdb, state);
1464 state->call = talloc(state, struct ctdb_call);
1465 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1467 state->reqid = reqid_new(ctdb->idr, state);
1468 state->ctdb_db = ctdb_db;
1469 talloc_set_destructor(state, ctdb_call_destructor);
1471 len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
1472 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1473 struct ctdb_req_call_old);
1474 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1475 state->c->hdr.destnode = header->dmaster;
1477 /* this limits us to 16k outstanding messages - not unreasonable */
1478 state->c->hdr.reqid = state->reqid;
1479 state->c->hdr.generation = ctdb_db->generation;
1480 state->c->flags = call->flags;
1481 state->c->db_id = ctdb_db->db_id;
1482 state->c->callid = call->call_id;
1483 state->c->hopcount = 0;
1484 state->c->keylen = call->key.dsize;
1485 state->c->calldatalen = call->call_data.dsize;
1486 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1487 memcpy(&state->c->data[call->key.dsize],
1488 call->call_data.dptr, call->call_data.dsize);
1489 *(state->call) = *call;
1490 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1491 state->call->key.dptr = &state->c->data[0];
1493 state->state = CTDB_CALL_WAIT;
1494 state->generation = ctdb_db->generation;
1496 DLIST_ADD(ctdb_db->pending_calls, state);
1498 ctdb_queue_packet(ctdb, &state->c->hdr);
1500 return state;
1504 make a remote ctdb call - async recv - called in daemon context
1506 This is called when the program wants to wait for a ctdb_call to complete and get the
1507 results. This call will block unless the call has already completed.
1509 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1511 while (state->state < CTDB_CALL_DONE) {
1512 tevent_loop_once(state->ctdb_db->ctdb->ev);
1514 if (state->state != CTDB_CALL_DONE) {
1515 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1516 talloc_free(state);
1517 return -1;
1520 if (state->call->reply_data.dsize) {
1521 call->reply_data.dptr = talloc_memdup(call,
1522 state->call->reply_data.dptr,
1523 state->call->reply_data.dsize);
1524 call->reply_data.dsize = state->call->reply_data.dsize;
1525 } else {
1526 call->reply_data.dptr = NULL;
1527 call->reply_data.dsize = 0;
1529 call->status = state->call->status;
1530 talloc_free(state);
1531 return 0;
1536 send a keepalive packet to the other node
1538 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1540 struct ctdb_req_keepalive_old *r;
1542 if (ctdb->methods == NULL) {
1543 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1544 return;
1547 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1548 sizeof(struct ctdb_req_keepalive_old),
1549 struct ctdb_req_keepalive_old);
1550 CTDB_NO_MEMORY_FATAL(ctdb, r);
1551 r->hdr.destnode = destnode;
1552 r->hdr.reqid = 0;
1554 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1556 ctdb_queue_packet(ctdb, &r->hdr);
1558 talloc_free(r);
1563 struct revokechild_deferred_call {
1564 struct ctdb_context *ctdb;
1565 struct ctdb_req_header *hdr;
1566 deferred_requeue_fn fn;
1567 void *ctx;
1570 struct revokechild_handle {
1571 struct revokechild_handle *next, *prev;
1572 struct ctdb_context *ctdb;
1573 struct ctdb_db_context *ctdb_db;
1574 struct tevent_fd *fde;
1575 int status;
1576 int fd[2];
1577 pid_t child;
1578 TDB_DATA key;
1581 struct revokechild_requeue_handle {
1582 struct ctdb_context *ctdb;
1583 struct ctdb_req_header *hdr;
1584 deferred_requeue_fn fn;
1585 void *ctx;
1588 static void deferred_call_requeue(struct tevent_context *ev,
1589 struct tevent_timer *te,
1590 struct timeval t, void *private_data)
1592 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1594 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1595 talloc_free(requeue_handle);
1598 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1600 struct ctdb_context *ctdb = deferred_call->ctdb;
1601 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1602 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)deferred_call->hdr;
1604 requeue_handle->ctdb = ctdb;
1605 requeue_handle->hdr = deferred_call->hdr;
1606 requeue_handle->fn = deferred_call->fn;
1607 requeue_handle->ctx = deferred_call->ctx;
1608 talloc_steal(requeue_handle, requeue_handle->hdr);
1610 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1611 tevent_add_timer(ctdb->ev, requeue_handle,
1612 timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0),
1613 deferred_call_requeue, requeue_handle);
1615 return 0;
1619 static int revokechild_destructor(struct revokechild_handle *rc)
1621 if (rc->fde != NULL) {
1622 talloc_free(rc->fde);
1625 if (rc->fd[0] != -1) {
1626 close(rc->fd[0]);
1628 if (rc->fd[1] != -1) {
1629 close(rc->fd[1]);
1631 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1633 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1634 return 0;
1637 static void revokechild_handler(struct tevent_context *ev,
1638 struct tevent_fd *fde,
1639 uint16_t flags, void *private_data)
1641 struct revokechild_handle *rc = talloc_get_type(private_data,
1642 struct revokechild_handle);
1643 int ret;
1644 char c;
1646 ret = sys_read(rc->fd[0], &c, 1);
1647 if (ret != 1) {
1648 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1649 rc->status = -1;
1650 talloc_free(rc);
1651 return;
1653 if (c != 0) {
1654 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1655 rc->status = -1;
1656 talloc_free(rc);
1657 return;
1660 talloc_free(rc);
1663 struct ctdb_revoke_state {
1664 struct ctdb_db_context *ctdb_db;
1665 TDB_DATA key;
1666 struct ctdb_ltdb_header *header;
1667 TDB_DATA data;
1668 int count;
1669 int status;
1670 int finished;
1673 static void update_record_cb(struct ctdb_client_control_state *state)
1675 struct ctdb_revoke_state *revoke_state;
1676 int ret;
1677 int32_t res;
1679 if (state == NULL) {
1680 return;
1682 revoke_state = state->async.private_data;
1684 state->async.fn = NULL;
1685 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1686 if ((ret != 0) || (res != 0)) {
1687 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1688 revoke_state->status = -1;
1691 revoke_state->count--;
1692 if (revoke_state->count <= 0) {
1693 revoke_state->finished = 1;
1697 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1699 struct ctdb_revoke_state *revoke_state = private_data;
1700 struct ctdb_client_control_state *state;
1702 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1703 if (state == NULL) {
1704 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1705 revoke_state->status = -1;
1706 return;
1708 state->async.fn = update_record_cb;
1709 state->async.private_data = revoke_state;
1711 revoke_state->count++;
1715 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1716 struct tevent_timer *te,
1717 struct timeval yt, void *private_data)
1719 struct ctdb_revoke_state *state = private_data;
1721 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1722 state->finished = 1;
1723 state->status = -1;
1726 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1728 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1729 struct ctdb_ltdb_header new_header;
1730 TDB_DATA new_data;
1732 state->ctdb_db = ctdb_db;
1733 state->key = key;
1734 state->header = header;
1735 state->data = data;
1737 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1739 tevent_add_timer(ctdb->ev, state,
1740 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1741 ctdb_revoke_timeout_handler, state);
1743 while (state->finished == 0) {
1744 tevent_loop_once(ctdb->ev);
1747 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1748 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1749 talloc_free(state);
1750 return -1;
1752 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1753 ctdb_ltdb_unlock(ctdb_db, key);
1754 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1755 talloc_free(state);
1756 return -1;
1758 header->rsn++;
1759 if (new_header.rsn > header->rsn) {
1760 ctdb_ltdb_unlock(ctdb_db, key);
1761 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1762 talloc_free(state);
1763 return -1;
1765 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1766 ctdb_ltdb_unlock(ctdb_db, key);
1767 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1768 talloc_free(state);
1769 return -1;
1773 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1774 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1776 if (state->status == 0) {
1777 new_header.rsn++;
1778 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1779 } else {
1780 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1781 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1783 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1784 ctdb_ltdb_unlock(ctdb_db, key);
1785 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1786 talloc_free(state);
1787 return -1;
1789 ctdb_ltdb_unlock(ctdb_db, key);
1791 talloc_free(state);
1792 return 0;
1796 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1798 TDB_DATA tdata;
1799 struct revokechild_handle *rc;
1800 pid_t parent = getpid();
1801 int ret;
1803 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1804 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1805 header->rsn -= 1;
1807 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1808 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1809 return -1;
1812 tdata = tdb_fetch(ctdb_db->rottdb, key);
1813 if (tdata.dsize > 0) {
1814 uint8_t *tmp;
1816 tmp = tdata.dptr;
1817 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1818 free(tmp);
1821 rc->status = 0;
1822 rc->ctdb = ctdb;
1823 rc->ctdb_db = ctdb_db;
1824 rc->fd[0] = -1;
1825 rc->fd[1] = -1;
1827 talloc_set_destructor(rc, revokechild_destructor);
1829 rc->key.dsize = key.dsize;
1830 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1831 if (rc->key.dptr == NULL) {
1832 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1833 talloc_free(rc);
1834 return -1;
1837 ret = pipe(rc->fd);
1838 if (ret != 0) {
1839 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1840 talloc_free(rc);
1841 return -1;
1845 rc->child = ctdb_fork(ctdb);
1846 if (rc->child == (pid_t)-1) {
1847 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1848 talloc_free(rc);
1849 return -1;
1852 if (rc->child == 0) {
1853 char c = 0;
1854 close(rc->fd[0]);
1855 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1857 prctl_set_comment("ctdb_revokechild");
1858 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1859 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1860 c = 1;
1861 goto child_finished;
1864 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1866 child_finished:
1867 sys_write(rc->fd[1], &c, 1);
1868 ctdb_wait_for_process_to_exit(parent);
1869 _exit(0);
1872 close(rc->fd[1]);
1873 rc->fd[1] = -1;
1874 set_close_on_exec(rc->fd[0]);
1876 /* This is an active revokechild child process */
1877 DLIST_ADD_END(ctdb_db->revokechild_active, rc);
1879 rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1880 revokechild_handler, (void *)rc);
1881 if (rc->fde == NULL) {
1882 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1883 talloc_free(rc);
1885 tevent_fd_set_auto_close(rc->fde);
1887 return 0;
1890 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1892 struct revokechild_handle *rc;
1893 struct revokechild_deferred_call *deferred_call;
1895 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1896 if (rc->key.dsize == 0) {
1897 continue;
1899 if (rc->key.dsize != key.dsize) {
1900 continue;
1902 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1903 break;
1907 if (rc == NULL) {
1908 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1909 return -1;
1912 deferred_call = talloc(rc, struct revokechild_deferred_call);
1913 if (deferred_call == NULL) {
1914 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1915 return -1;
1918 deferred_call->ctdb = ctdb;
1919 deferred_call->hdr = hdr;
1920 deferred_call->fn = fn;
1921 deferred_call->ctx = call_context;
1923 talloc_set_destructor(deferred_call, deferred_call_destructor);
1924 talloc_steal(deferred_call, hdr);
1926 return 0;