s4:provision_users.ldif: Add Protected Users group
[Samba.git] / ctdb / server / ctdb_call.c
blob14baa797bd637cfc595ebcfeff2e2b0b5e8fad27
1 /*
2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/sys_rw.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/rb_tree.h"
40 #include "common/reqid.h"
41 #include "common/system.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44 #include "common/hash_count.h"
46 struct ctdb_sticky_record {
47 struct ctdb_context *ctdb;
48 struct ctdb_db_context *ctdb_db;
49 TDB_CONTEXT *pindown;
53 find the ctdb_db from a db index
55 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
57 struct ctdb_db_context *ctdb_db;
59 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
60 if (ctdb_db->db_id == id) {
61 break;
64 return ctdb_db;
68 a variant of input packet that can be used in lock requeue
70 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
72 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
73 ctdb_input_pkt(ctdb, hdr);
78 send an error reply
80 static void ctdb_send_error(struct ctdb_context *ctdb,
81 struct ctdb_req_header *hdr, uint32_t status,
82 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
83 static void ctdb_send_error(struct ctdb_context *ctdb,
84 struct ctdb_req_header *hdr, uint32_t status,
85 const char *fmt, ...)
87 va_list ap;
88 struct ctdb_reply_error_old *r;
89 char *msg;
90 int msglen, len;
92 if (ctdb->methods == NULL) {
93 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
94 return;
97 va_start(ap, fmt);
98 msg = talloc_vasprintf(ctdb, fmt, ap);
99 if (msg == NULL) {
100 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
102 va_end(ap);
104 msglen = strlen(msg)+1;
105 len = offsetof(struct ctdb_reply_error_old, msg);
106 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
107 struct ctdb_reply_error_old);
108 CTDB_NO_MEMORY_FATAL(ctdb, r);
110 r->hdr.destnode = hdr->srcnode;
111 r->hdr.reqid = hdr->reqid;
112 r->status = status;
113 r->msglen = msglen;
114 memcpy(&r->msg[0], msg, msglen);
116 ctdb_queue_packet(ctdb, &r->hdr);
118 talloc_free(msg);
123 * send a redirect reply
125 * The logic behind this function is this:
127 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
128 * to its local ctdb (ctdb_request_call). If the node is not itself
129 * the record's DMASTER, it first redirects the packet to the
130 * record's LMASTER. The LMASTER then redirects the call packet to
131 * the current DMASTER. Note that this works because of this: When
132 * a record is migrated off a node, then the new DMASTER is stored
133 * in the record's copy on the former DMASTER.
135 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
136 struct ctdb_db_context *ctdb_db,
137 TDB_DATA key,
138 struct ctdb_req_call_old *c,
139 struct ctdb_ltdb_header *header)
141 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
143 c->hdr.destnode = lmaster;
144 if (ctdb->pnn == lmaster) {
145 c->hdr.destnode = header->dmaster;
147 c->hopcount++;
149 if (c->hopcount%100 > 95) {
150 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
151 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
152 "header->dmaster:%d dst:%d\n",
153 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
154 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
155 header->dmaster, c->hdr.destnode));
158 ctdb_queue_packet(ctdb, &c->hdr);
163 send a dmaster reply
165 caller must have the chainlock before calling this routine. Caller must be
166 the lmaster
168 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
169 struct ctdb_ltdb_header *header,
170 TDB_DATA key, TDB_DATA data,
171 uint32_t new_dmaster,
172 uint32_t reqid)
174 struct ctdb_context *ctdb = ctdb_db->ctdb;
175 struct ctdb_reply_dmaster_old *r;
176 int ret, len;
177 TALLOC_CTX *tmp_ctx;
179 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
180 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
181 return;
184 header->dmaster = new_dmaster;
185 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
186 if (ret != 0) {
187 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
188 return;
191 if (ctdb->methods == NULL) {
192 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
193 return;
196 /* put the packet on a temporary context, allowing us to safely free
197 it below even if ctdb_reply_dmaster() has freed it already */
198 tmp_ctx = talloc_new(ctdb);
200 /* send the CTDB_REPLY_DMASTER */
201 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
202 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
203 struct ctdb_reply_dmaster_old);
204 CTDB_NO_MEMORY_FATAL(ctdb, r);
206 r->hdr.destnode = new_dmaster;
207 r->hdr.reqid = reqid;
208 r->hdr.generation = ctdb_db->generation;
209 r->rsn = header->rsn;
210 r->keylen = key.dsize;
211 r->datalen = data.dsize;
212 r->db_id = ctdb_db->db_id;
213 memcpy(&r->data[0], key.dptr, key.dsize);
214 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
215 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
217 ctdb_queue_packet(ctdb, &r->hdr);
219 talloc_free(tmp_ctx);
223 send a dmaster request (give another node the dmaster for a record)
225 This is always sent to the lmaster, which ensures that the lmaster
226 always knows who the dmaster is. The lmaster will then send a
227 CTDB_REPLY_DMASTER to the new dmaster
229 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
230 struct ctdb_req_call_old *c,
231 struct ctdb_ltdb_header *header,
232 TDB_DATA *key, TDB_DATA *data)
234 struct ctdb_req_dmaster_old *r;
235 struct ctdb_context *ctdb = ctdb_db->ctdb;
236 int len;
237 uint32_t lmaster = ctdb_lmaster(ctdb, key);
239 if (ctdb->methods == NULL) {
240 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
241 return;
244 if (data->dsize != 0) {
245 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
248 if (lmaster == ctdb->pnn) {
249 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
250 c->hdr.srcnode, c->hdr.reqid);
251 return;
254 len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
255 + sizeof(uint32_t);
256 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
257 struct ctdb_req_dmaster_old);
258 CTDB_NO_MEMORY_FATAL(ctdb, r);
259 r->hdr.destnode = lmaster;
260 r->hdr.reqid = c->hdr.reqid;
261 r->hdr.generation = ctdb_db->generation;
262 r->db_id = c->db_id;
263 r->rsn = header->rsn;
264 r->dmaster = c->hdr.srcnode;
265 r->keylen = key->dsize;
266 r->datalen = data->dsize;
267 memcpy(&r->data[0], key->dptr, key->dsize);
268 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
269 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
271 header->dmaster = c->hdr.srcnode;
272 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
273 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
276 ctdb_queue_packet(ctdb, &r->hdr);
278 talloc_free(r);
281 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
282 struct tevent_timer *te,
283 struct timeval t, void *private_data)
285 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
286 struct ctdb_sticky_record);
288 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
289 if (sr->pindown != NULL) {
290 talloc_free(sr->pindown);
291 sr->pindown = NULL;
295 static int
296 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
298 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
299 uint32_t *k;
300 struct ctdb_sticky_record *sr;
302 k = ctdb_key_to_idkey(tmp_ctx, key);
303 if (k == NULL) {
304 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
305 talloc_free(tmp_ctx);
306 return -1;
309 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
310 if (sr == NULL) {
311 talloc_free(tmp_ctx);
312 return 0;
315 talloc_free(tmp_ctx);
317 if (sr->pindown == NULL) {
318 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
319 sr->pindown = talloc_new(sr);
320 if (sr->pindown == NULL) {
321 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
322 return -1;
324 tevent_add_timer(ctdb->ev, sr->pindown,
325 timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
326 (ctdb->tunable.sticky_pindown * 1000) % 1000000),
327 ctdb_sticky_pindown_timeout, sr);
330 return 0;
334 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
335 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
337 must be called with the chainlock held. This function releases the chainlock
339 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
340 struct ctdb_req_header *hdr,
341 TDB_DATA key, TDB_DATA data,
342 uint64_t rsn, uint32_t record_flags)
344 struct ctdb_call_state *state;
345 struct ctdb_context *ctdb = ctdb_db->ctdb;
346 struct ctdb_ltdb_header header;
347 int ret;
349 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
351 ZERO_STRUCT(header);
352 header.rsn = rsn;
353 header.dmaster = ctdb->pnn;
354 header.flags = record_flags;
356 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
358 if (state) {
359 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
361 * We temporarily add the VACUUM_MIGRATED flag to
362 * the record flags, so that ctdb_ltdb_store can
363 * decide whether the record should be stored or
364 * deleted.
366 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
370 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
371 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
373 ret = ctdb_ltdb_unlock(ctdb_db, key);
374 if (ret != 0) {
375 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
377 return;
380 /* we just became DMASTER and this database is "sticky",
381 see if the record is flagged as "hot" and set up a pin-down
382 context to stop migrations for a little while if so
384 if (ctdb_db_sticky(ctdb_db)) {
385 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
388 if (state == NULL) {
389 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
390 ctdb->pnn, hdr->reqid, hdr->srcnode));
392 ret = ctdb_ltdb_unlock(ctdb_db, key);
393 if (ret != 0) {
394 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
396 return;
399 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
400 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
402 ret = ctdb_ltdb_unlock(ctdb_db, key);
403 if (ret != 0) {
404 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
406 return;
409 if (hdr->reqid != state->reqid) {
410 /* we found a record but it was the wrong one */
411 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
413 ret = ctdb_ltdb_unlock(ctdb_db, key);
414 if (ret != 0) {
415 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
417 return;
420 (void) hash_count_increment(ctdb_db->migratedb, key);
422 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
424 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
425 if (ret != 0) {
426 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
429 state->state = CTDB_CALL_DONE;
430 if (state->async.fn) {
431 state->async.fn(state);
435 struct dmaster_defer_call {
436 struct dmaster_defer_call *next, *prev;
437 struct ctdb_context *ctdb;
438 struct ctdb_req_header *hdr;
441 struct dmaster_defer_queue {
442 struct ctdb_db_context *ctdb_db;
443 uint32_t generation;
444 struct dmaster_defer_call *deferred_calls;
447 static void dmaster_defer_reprocess(struct tevent_context *ev,
448 struct tevent_timer *te,
449 struct timeval t,
450 void *private_data)
452 struct dmaster_defer_call *call = talloc_get_type(
453 private_data, struct dmaster_defer_call);
455 ctdb_input_pkt(call->ctdb, call->hdr);
456 talloc_free(call);
459 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
461 /* Ignore requests, if database recovery happens in-between. */
462 if (ddq->generation != ddq->ctdb_db->generation) {
463 return 0;
466 while (ddq->deferred_calls != NULL) {
467 struct dmaster_defer_call *call = ddq->deferred_calls;
469 DLIST_REMOVE(ddq->deferred_calls, call);
471 talloc_steal(call->ctdb, call);
472 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
473 dmaster_defer_reprocess, call);
475 return 0;
478 static void *insert_ddq_callback(void *parm, void *data)
480 if (data) {
481 talloc_free(data);
483 return parm;
487 * This function is used to register a key in database that needs to be updated.
488 * Any requests for that key should get deferred till this is completed.
490 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
491 struct ctdb_req_header *hdr,
492 TDB_DATA key)
494 uint32_t *k;
495 struct dmaster_defer_queue *ddq;
497 k = ctdb_key_to_idkey(hdr, key);
498 if (k == NULL) {
499 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
500 return -1;
503 /* Already exists */
504 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
505 if (ddq != NULL) {
506 if (ddq->generation == ctdb_db->generation) {
507 talloc_free(k);
508 return 0;
511 /* Recovery occurred - get rid of old queue. All the deferred
512 * requests will be resent anyway from ctdb_call_resend_db.
514 talloc_free(ddq);
517 ddq = talloc(hdr, struct dmaster_defer_queue);
518 if (ddq == NULL) {
519 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
520 talloc_free(k);
521 return -1;
523 ddq->ctdb_db = ctdb_db;
524 ddq->generation = hdr->generation;
525 ddq->deferred_calls = NULL;
527 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
528 insert_ddq_callback, ddq);
529 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
531 talloc_free(k);
532 return 0;
535 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
536 struct ctdb_req_header *hdr,
537 TDB_DATA key)
539 struct dmaster_defer_queue *ddq;
540 struct dmaster_defer_call *call;
541 uint32_t *k;
543 k = ctdb_key_to_idkey(hdr, key);
544 if (k == NULL) {
545 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
546 return -1;
549 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
550 if (ddq == NULL) {
551 talloc_free(k);
552 return -1;
555 talloc_free(k);
557 if (ddq->generation != hdr->generation) {
558 talloc_set_destructor(ddq, NULL);
559 talloc_free(ddq);
560 return -1;
563 call = talloc(ddq, struct dmaster_defer_call);
564 if (call == NULL) {
565 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
566 return -1;
569 call->ctdb = ctdb_db->ctdb;
570 call->hdr = talloc_steal(call, hdr);
572 DLIST_ADD_END(ddq->deferred_calls, call);
574 return 0;
578 called when a CTDB_REQ_DMASTER packet comes in
580 this comes into the lmaster for a record when the current dmaster
581 wants to give up the dmaster role and give it to someone else
583 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
585 struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
586 TDB_DATA key, data, data2;
587 struct ctdb_ltdb_header header;
588 struct ctdb_db_context *ctdb_db;
589 uint32_t record_flags = 0;
590 size_t len;
591 int ret;
593 key.dptr = c->data;
594 key.dsize = c->keylen;
595 data.dptr = c->data + c->keylen;
596 data.dsize = c->datalen;
597 len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
598 + sizeof(uint32_t);
599 if (len <= c->hdr.length) {
600 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
601 sizeof(record_flags));
604 ctdb_db = find_ctdb_db(ctdb, c->db_id);
605 if (!ctdb_db) {
606 ctdb_send_error(ctdb, hdr, -1,
607 "Unknown database in request. db_id==0x%08x",
608 c->db_id);
609 return;
612 dmaster_defer_setup(ctdb_db, hdr, key);
614 /* fetch the current record */
615 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
616 ctdb_call_input_pkt, ctdb, false);
617 if (ret == -1) {
618 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
619 return;
621 if (ret == -2) {
622 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
623 return;
626 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
627 DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
628 "db=%s lmaster=%u gen=%u curgen=%u\n",
629 ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
630 hdr->generation, ctdb_db->generation));
631 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
634 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
635 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
637 /* its a protocol error if the sending node is not the current dmaster */
638 if (header.dmaster != hdr->srcnode) {
639 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
640 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
641 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
642 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
643 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
644 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
645 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
647 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
648 ctdb_ltdb_unlock(ctdb_db, key);
649 return;
653 if (header.rsn > c->rsn) {
654 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
655 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
656 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
657 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
660 /* use the rsn from the sending node */
661 header.rsn = c->rsn;
663 /* store the record flags from the sending node */
664 header.flags = record_flags;
666 /* check if the new dmaster is the lmaster, in which case we
667 skip the dmaster reply */
668 if (c->dmaster == ctdb->pnn) {
669 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
670 } else {
671 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
673 ret = ctdb_ltdb_unlock(ctdb_db, key);
674 if (ret != 0) {
675 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
680 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
681 struct tevent_timer *te,
682 struct timeval t, void *private_data)
684 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
685 struct ctdb_sticky_record);
686 talloc_free(sr);
689 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
691 if (data) {
692 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
693 talloc_free(data);
695 return parm;
698 static int
699 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
701 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
702 uint32_t *k;
703 struct ctdb_sticky_record *sr;
705 k = ctdb_key_to_idkey(tmp_ctx, key);
706 if (k == NULL) {
707 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
708 talloc_free(tmp_ctx);
709 return -1;
712 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
713 if (sr != NULL) {
714 talloc_free(tmp_ctx);
715 return 0;
718 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
719 if (sr == NULL) {
720 talloc_free(tmp_ctx);
721 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
722 return -1;
725 sr->ctdb = ctdb;
726 sr->ctdb_db = ctdb_db;
727 sr->pindown = NULL;
729 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
730 ctdb->tunable.sticky_duration,
731 ctdb_db->db_name, ctdb_hash(&key)));
733 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
735 tevent_add_timer(ctdb->ev, sr,
736 timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
737 ctdb_sticky_record_timeout, sr);
739 talloc_free(tmp_ctx);
740 return 0;
743 struct pinned_down_requeue_handle {
744 struct ctdb_context *ctdb;
745 struct ctdb_req_header *hdr;
748 struct pinned_down_deferred_call {
749 struct ctdb_context *ctdb;
750 struct ctdb_req_header *hdr;
753 static void pinned_down_requeue(struct tevent_context *ev,
754 struct tevent_timer *te,
755 struct timeval t, void *private_data)
757 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
758 struct ctdb_context *ctdb = handle->ctdb;
760 talloc_steal(ctdb, handle->hdr);
761 ctdb_call_input_pkt(ctdb, handle->hdr);
763 talloc_free(handle);
766 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
768 struct ctdb_context *ctdb = pinned_down->ctdb;
769 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
771 handle->ctdb = pinned_down->ctdb;
772 handle->hdr = pinned_down->hdr;
773 talloc_steal(handle, handle->hdr);
775 tevent_add_timer(ctdb->ev, handle, timeval_zero(),
776 pinned_down_requeue, handle);
778 return 0;
781 static int
782 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
784 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
785 uint32_t *k;
786 struct ctdb_sticky_record *sr;
787 struct pinned_down_deferred_call *pinned_down;
789 k = ctdb_key_to_idkey(tmp_ctx, key);
790 if (k == NULL) {
791 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
792 talloc_free(tmp_ctx);
793 return -1;
796 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
797 if (sr == NULL) {
798 talloc_free(tmp_ctx);
799 return -1;
802 talloc_free(tmp_ctx);
804 if (sr->pindown == NULL) {
805 return -1;
808 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
809 if (pinned_down == NULL) {
810 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
811 return -1;
814 pinned_down->ctdb = ctdb;
815 pinned_down->hdr = hdr;
817 talloc_set_destructor(pinned_down, pinned_down_destructor);
818 talloc_steal(pinned_down, hdr);
820 return 0;
823 static int hot_key_cmp(const void *a, const void *b)
825 const struct ctdb_db_hot_key *ka = (const struct ctdb_db_hot_key *)a;
826 const struct ctdb_db_hot_key *kb = (const struct ctdb_db_hot_key *)b;
828 if (ka->count < kb->count) {
829 return -1;
831 if (ka->count > kb->count) {
832 return 1;
835 return 0;
838 static void
839 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key,
840 unsigned int count)
842 unsigned int i, id;
843 char *keystr;
846 * If all slots are being used then only need to compare
847 * against the count in the 0th slot, since it contains the
848 * smallest count.
850 if (ctdb_db->statistics.num_hot_keys == MAX_HOT_KEYS &&
851 count <= ctdb_db->hot_keys[0].count) {
852 return;
855 /* see if we already know this key */
856 for (i = 0; i < MAX_HOT_KEYS; i++) {
857 if (key.dsize != ctdb_db->hot_keys[i].key.dsize) {
858 continue;
860 if (memcmp(key.dptr, ctdb_db->hot_keys[i].key.dptr, key.dsize)) {
861 continue;
863 /* found an entry for this key */
864 if (count <= ctdb_db->hot_keys[i].count) {
865 return;
867 if (count >= (2 * ctdb_db->hot_keys[i].last_logged_count)) {
868 keystr = hex_encode_talloc(ctdb_db,
869 (unsigned char *)key.dptr,
870 key.dsize);
871 D_NOTICE("Updated hot key database=%s key=%s count=%d\n",
872 ctdb_db->db_name,
873 keystr ? keystr : "" ,
874 count);
875 TALLOC_FREE(keystr);
876 ctdb_db->hot_keys[i].last_logged_count = count;
878 ctdb_db->hot_keys[i].count = count;
879 goto sort_keys;
882 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
883 id = ctdb_db->statistics.num_hot_keys;
884 ctdb_db->statistics.num_hot_keys++;
885 } else {
886 id = 0;
889 if (ctdb_db->hot_keys[id].key.dptr != NULL) {
890 talloc_free(ctdb_db->hot_keys[id].key.dptr);
892 ctdb_db->hot_keys[id].key.dsize = key.dsize;
893 ctdb_db->hot_keys[id].key.dptr = talloc_memdup(ctdb_db,
894 key.dptr,
895 key.dsize);
896 ctdb_db->hot_keys[id].count = count;
898 keystr = hex_encode_talloc(ctdb_db,
899 (unsigned char *)key.dptr, key.dsize);
900 D_NOTICE("Added hot key database=%s key=%s count=%d\n",
901 ctdb_db->db_name,
902 keystr ? keystr : "" ,
903 count);
904 talloc_free(keystr);
905 ctdb_db->hot_keys[id].last_logged_count = count;
907 sort_keys:
908 qsort(&ctdb_db->hot_keys[0],
909 ctdb_db->statistics.num_hot_keys,
910 sizeof(struct ctdb_db_hot_key),
911 hot_key_cmp);
915 called when a CTDB_REQ_CALL packet comes in
917 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
919 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
920 TDB_DATA data;
921 struct ctdb_reply_call_old *r;
922 int ret, len;
923 struct ctdb_ltdb_header header;
924 struct ctdb_call *call;
925 struct ctdb_db_context *ctdb_db;
926 int tmp_count, bucket;
928 if (ctdb->methods == NULL) {
929 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
930 return;
934 ctdb_db = find_ctdb_db(ctdb, c->db_id);
935 if (!ctdb_db) {
936 ctdb_send_error(ctdb, hdr, -1,
937 "Unknown database in request. db_id==0x%08x",
938 c->db_id);
939 return;
942 call = talloc(hdr, struct ctdb_call);
943 CTDB_NO_MEMORY_FATAL(ctdb, call);
945 call->call_id = c->callid;
946 call->key.dptr = c->data;
947 call->key.dsize = c->keylen;
948 call->call_data.dptr = c->data + c->keylen;
949 call->call_data.dsize = c->calldatalen;
950 call->reply_data.dptr = NULL;
951 call->reply_data.dsize = 0;
954 /* If this record is pinned down we should defer the
955 request until the pindown times out
957 if (ctdb_db_sticky(ctdb_db)) {
958 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
959 DEBUG(DEBUG_WARNING,
960 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
961 talloc_free(call);
962 return;
966 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
967 talloc_free(call);
968 return;
971 /* determine if we are the dmaster for this key. This also
972 fetches the record data (if any), thus avoiding a 2nd fetch of the data
973 if the call will be answered locally */
975 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
976 ctdb_call_input_pkt, ctdb, false);
977 if (ret == -1) {
978 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
979 talloc_free(call);
980 return;
982 if (ret == -2) {
983 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
984 talloc_free(call);
985 return;
988 /* Dont do READONLY if we don't have a tracking database */
989 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
990 c->flags &= ~CTDB_WANT_READONLY;
993 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
994 header.flags &= ~CTDB_REC_RO_FLAGS;
995 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
996 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
997 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
998 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
1000 /* and clear out the tracking data */
1001 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
1002 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
1006 /* if we are revoking, we must defer all other calls until the revoke
1007 * had completed.
1009 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
1010 talloc_free(data.dptr);
1011 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1013 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1014 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1016 talloc_free(call);
1017 return;
1021 * If we are not the dmaster and are not hosting any delegations,
1022 * then we redirect the request to the node than can answer it
1023 * (the lmaster or the dmaster).
1025 if ((header.dmaster != ctdb->pnn)
1026 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
1027 talloc_free(data.dptr);
1028 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
1030 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1031 if (ret != 0) {
1032 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1034 talloc_free(call);
1035 return;
1038 if ( (!(c->flags & CTDB_WANT_READONLY))
1039 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1040 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1041 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1042 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1044 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1046 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1047 ctdb_fatal(ctdb, "Failed to start record revoke");
1049 talloc_free(data.dptr);
1051 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1052 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1054 talloc_free(call);
1056 return;
1059 /* If this is the first request for delegation. bump rsn and set
1060 * the delegations flag
1062 if ((c->flags & CTDB_WANT_READONLY)
1063 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1064 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1065 header.rsn += 3;
1066 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1067 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1068 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1071 if ((c->flags & CTDB_WANT_READONLY)
1072 && ((unsigned int)call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1073 TDB_DATA tdata;
1075 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1076 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1077 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1079 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1080 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1082 free(tdata.dptr);
1084 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1085 if (ret != 0) {
1086 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1089 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1090 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1091 struct ctdb_reply_call_old);
1092 CTDB_NO_MEMORY_FATAL(ctdb, r);
1093 r->hdr.destnode = c->hdr.srcnode;
1094 r->hdr.reqid = c->hdr.reqid;
1095 r->hdr.generation = ctdb_db->generation;
1096 r->status = 0;
1097 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1098 header.rsn -= 2;
1099 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1100 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1101 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1103 if (data.dsize) {
1104 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1107 ctdb_queue_packet(ctdb, &r->hdr);
1108 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1109 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1111 talloc_free(r);
1112 talloc_free(call);
1113 return;
1116 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1117 tmp_count = c->hopcount;
1118 bucket = 0;
1119 while (tmp_count) {
1120 tmp_count >>= 1;
1121 bucket++;
1123 if (bucket >= MAX_COUNT_BUCKETS) {
1124 bucket = MAX_COUNT_BUCKETS - 1;
1126 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1127 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1129 /* If this database supports sticky records, then check if the
1130 hopcount is big. If it is it means the record is hot and we
1131 should make it sticky.
1133 if (ctdb_db_sticky(ctdb_db) &&
1134 c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1135 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1139 /* Try if possible to migrate the record off to the caller node.
1140 * From the clients perspective a fetch of the data is just as
1141 * expensive as a migration.
1143 if (c->hdr.srcnode != ctdb->pnn) {
1144 if (ctdb_db->persistent_state) {
1145 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1146 " of key %s while transaction is active\n",
1147 (char *)call->key.dptr));
1148 } else {
1149 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1150 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1151 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1152 talloc_free(data.dptr);
1154 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1155 if (ret != 0) {
1156 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1159 talloc_free(call);
1160 return;
1163 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1164 if (ret != 0) {
1165 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1166 call->status = -1;
1169 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1170 if (ret != 0) {
1171 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1174 len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1175 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1176 struct ctdb_reply_call_old);
1177 CTDB_NO_MEMORY_FATAL(ctdb, r);
1178 r->hdr.destnode = hdr->srcnode;
1179 r->hdr.reqid = hdr->reqid;
1180 r->hdr.generation = ctdb_db->generation;
1181 r->status = call->status;
1182 r->datalen = call->reply_data.dsize;
1183 if (call->reply_data.dsize) {
1184 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1187 ctdb_queue_packet(ctdb, &r->hdr);
1189 talloc_free(r);
1190 talloc_free(call);
1194 * called when a CTDB_REPLY_CALL packet comes in
1196 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1197 * contains any reply data from the call
1199 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1201 struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1202 struct ctdb_call_state *state;
1204 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1205 if (state == NULL) {
1206 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1207 return;
1210 if (hdr->reqid != state->reqid) {
1211 /* we found a record but it was the wrong one */
1212 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1213 return;
1217 /* read only delegation processing */
1218 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1219 * delegation since we may need to update the record header
1221 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1222 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1223 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1224 struct ctdb_ltdb_header oldheader;
1225 TDB_DATA key, data, olddata;
1226 int ret;
1228 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1229 goto finished_ro;
1230 return;
1233 key.dsize = state->c->keylen;
1234 key.dptr = state->c->data;
1235 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1236 ctdb_call_input_pkt, ctdb, false);
1237 if (ret == -2) {
1238 return;
1240 if (ret != 0) {
1241 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1242 return;
1245 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1246 if (ret != 0) {
1247 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1248 ctdb_ltdb_unlock(ctdb_db, key);
1249 goto finished_ro;
1252 if (header->rsn <= oldheader.rsn) {
1253 ctdb_ltdb_unlock(ctdb_db, key);
1254 goto finished_ro;
1257 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1258 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1259 ctdb_ltdb_unlock(ctdb_db, key);
1260 goto finished_ro;
1263 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1264 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1265 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1266 if (ret != 0) {
1267 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1268 ctdb_ltdb_unlock(ctdb_db, key);
1269 goto finished_ro;
1272 ctdb_ltdb_unlock(ctdb_db, key);
1274 finished_ro:
1276 state->call->reply_data.dptr = c->data;
1277 state->call->reply_data.dsize = c->datalen;
1278 state->call->status = c->status;
1280 talloc_steal(state, c);
1282 state->state = CTDB_CALL_DONE;
1283 if (state->async.fn) {
1284 state->async.fn(state);
1290 * called when a CTDB_REPLY_DMASTER packet comes in
1292 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1293 * request packet. It means that the current dmaster wants to give us
1294 * the dmaster role.
1296 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1298 struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1299 struct ctdb_db_context *ctdb_db;
1300 TDB_DATA key, data;
1301 uint32_t record_flags = 0;
1302 size_t len;
1303 int ret;
1305 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1306 if (ctdb_db == NULL) {
1307 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1308 return;
1311 key.dptr = c->data;
1312 key.dsize = c->keylen;
1313 data.dptr = &c->data[key.dsize];
1314 data.dsize = c->datalen;
1315 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1316 + sizeof(uint32_t);
1317 if (len <= c->hdr.length) {
1318 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1319 sizeof(record_flags));
1322 dmaster_defer_setup(ctdb_db, hdr, key);
1324 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1325 ctdb_call_input_pkt, ctdb, false);
1326 if (ret == -2) {
1327 return;
1329 if (ret != 0) {
1330 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1331 return;
1334 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1339 called when a CTDB_REPLY_ERROR packet comes in
1341 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1343 struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1344 struct ctdb_call_state *state;
1346 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1347 if (state == NULL) {
1348 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1349 ctdb->pnn, hdr->reqid));
1350 return;
1353 if (hdr->reqid != state->reqid) {
1354 /* we found a record but it was the wrong one */
1355 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1356 return;
1359 talloc_steal(state, c);
1361 state->state = CTDB_CALL_ERROR;
1362 state->errmsg = (char *)c->msg;
1363 if (state->async.fn) {
1364 state->async.fn(state);
1370 destroy a ctdb_call
1372 static int ctdb_call_destructor(struct ctdb_call_state *state)
1374 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1375 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1376 return 0;
1381 called when a ctdb_call needs to be resent after a reconfigure event
1383 static void ctdb_call_resend(struct ctdb_call_state *state)
1385 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1387 state->generation = state->ctdb_db->generation;
1389 /* use a new reqid, in case the old reply does eventually come in */
1390 reqid_remove(ctdb->idr, state->reqid);
1391 state->reqid = reqid_new(ctdb->idr, state);
1392 state->c->hdr.reqid = state->reqid;
1394 /* update the generation count for this request, so its valid with the new vnn_map */
1395 state->c->hdr.generation = state->generation;
1397 /* send the packet to ourselves, it will be redirected appropriately */
1398 state->c->hdr.destnode = ctdb->pnn;
1400 ctdb_queue_packet(ctdb, &state->c->hdr);
1401 DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1402 state->ctdb_db->db_name, state->reqid, state->generation));
1406 resend all pending calls on recovery
1408 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1410 struct ctdb_call_state *state, *next;
1412 for (state = ctdb_db->pending_calls; state; state = next) {
1413 next = state->next;
1414 ctdb_call_resend(state);
1418 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1420 struct ctdb_db_context *ctdb_db;
1422 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1423 ctdb_call_resend_db(ctdb_db);
1428 this allows the caller to setup a async.fn
1430 static void call_local_trigger(struct tevent_context *ev,
1431 struct tevent_timer *te,
1432 struct timeval t, void *private_data)
1434 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1435 if (state->async.fn) {
1436 state->async.fn(state);
1442 construct an event driven local ctdb_call
1444 this is used so that locally processed ctdb_call requests are processed
1445 in an event driven manner
1447 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1448 struct ctdb_call *call,
1449 struct ctdb_ltdb_header *header,
1450 TDB_DATA *data)
1452 struct ctdb_call_state *state;
1453 struct ctdb_context *ctdb = ctdb_db->ctdb;
1454 int ret;
1456 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1457 CTDB_NO_MEMORY_NULL(ctdb, state);
1459 talloc_steal(state, data->dptr);
1461 state->state = CTDB_CALL_DONE;
1462 state->call = talloc(state, struct ctdb_call);
1463 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1464 *(state->call) = *call;
1465 state->ctdb_db = ctdb_db;
1467 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1468 if (ret != 0) {
1469 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1472 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1473 call_local_trigger, state);
1475 return state;
1480 make a remote ctdb call - async send. Called in daemon context.
1482 This constructs a ctdb_call request and queues it for processing.
1483 This call never blocks.
1485 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1486 struct ctdb_call *call,
1487 struct ctdb_ltdb_header *header)
1489 uint32_t len;
1490 struct ctdb_call_state *state;
1491 struct ctdb_context *ctdb = ctdb_db->ctdb;
1492 struct ctdb_req_call_old *c;
1494 if (ctdb->methods == NULL) {
1495 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1496 return NULL;
1499 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1500 CTDB_NO_MEMORY_NULL(ctdb, state);
1501 state->call = talloc(state, struct ctdb_call);
1502 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1504 state->reqid = reqid_new(ctdb->idr, state);
1505 state->ctdb_db = ctdb_db;
1506 state->state = CTDB_CALL_WAIT;
1507 state->generation = ctdb_db->generation;
1509 len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize +
1510 call->call_data.dsize;
1512 c = ctdb_transport_allocate(ctdb,
1513 state,
1514 CTDB_REQ_CALL,
1515 len,
1516 struct ctdb_req_call_old);
1518 CTDB_NO_MEMORY_NULL(ctdb, c);
1519 state->c = c;
1521 c->hdr.destnode = header->dmaster;
1522 c->hdr.reqid = state->reqid;
1523 c->hdr.generation = ctdb_db->generation;
1524 c->flags = call->flags;
1525 c->db_id = ctdb_db->db_id;
1526 c->callid = call->call_id;
1527 c->hopcount = 0;
1528 c->keylen = call->key.dsize;
1529 c->calldatalen = call->call_data.dsize;
1531 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
1532 memcpy(&c->data[call->key.dsize],
1533 call->call_data.dptr,
1534 call->call_data.dsize);
1536 *(state->call) = *call;
1537 state->call->call_data.dptr = &c->data[call->key.dsize];
1538 state->call->key.dptr = &c->data[0];
1540 DLIST_ADD(ctdb_db->pending_calls, state);
1542 talloc_set_destructor(state, ctdb_call_destructor);
1543 ctdb_queue_packet(ctdb, &state->c->hdr);
1545 return state;
1549 make a remote ctdb call - async recv - called in daemon context
1551 This is called when the program wants to wait for a ctdb_call to complete and get the
1552 results. This call will block unless the call has already completed.
1554 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1556 while (state->state < CTDB_CALL_DONE) {
1557 tevent_loop_once(state->ctdb_db->ctdb->ev);
1559 if (state->state != CTDB_CALL_DONE) {
1560 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1561 talloc_free(state);
1562 return -1;
1565 if (state->call->reply_data.dsize) {
1566 call->reply_data.dptr = talloc_memdup(call,
1567 state->call->reply_data.dptr,
1568 state->call->reply_data.dsize);
1569 call->reply_data.dsize = state->call->reply_data.dsize;
1570 } else {
1571 call->reply_data.dptr = NULL;
1572 call->reply_data.dsize = 0;
1574 call->status = state->call->status;
1575 talloc_free(state);
1576 return 0;
1580 struct revokechild_deferred_call {
1581 struct revokechild_deferred_call *prev, *next;
1582 struct ctdb_context *ctdb;
1583 struct ctdb_req_header *hdr;
1584 deferred_requeue_fn fn;
1585 void *ctx;
1586 struct revokechild_handle *rev_hdl;
1589 struct revokechild_handle {
1590 struct revokechild_handle *next, *prev;
1591 struct ctdb_context *ctdb;
1592 struct ctdb_db_context *ctdb_db;
1593 struct tevent_fd *fde;
1594 int status;
1595 int fd[2];
1596 pid_t child;
1597 TDB_DATA key;
1598 struct revokechild_deferred_call *deferred_call_list;
1601 static void deferred_call_requeue(struct tevent_context *ev,
1602 struct tevent_timer *te,
1603 struct timeval t, void *private_data)
1605 struct revokechild_deferred_call *dlist = talloc_get_type_abort(
1606 private_data, struct revokechild_deferred_call);
1608 while (dlist != NULL) {
1609 struct revokechild_deferred_call *dcall = dlist;
1611 talloc_set_destructor(dcall, NULL);
1612 DLIST_REMOVE(dlist, dcall);
1613 dcall->fn(dcall->ctx, dcall->hdr);
1614 talloc_free(dcall);
1618 static int deferred_call_destructor(struct revokechild_deferred_call *dcall)
1620 struct revokechild_handle *rev_hdl = dcall->rev_hdl;
1622 DLIST_REMOVE(rev_hdl->deferred_call_list, dcall);
1623 return 0;
1626 static int revokechild_destructor(struct revokechild_handle *rev_hdl)
1628 struct revokechild_deferred_call *now_list = NULL;
1629 struct revokechild_deferred_call *delay_list = NULL;
1631 if (rev_hdl->fde != NULL) {
1632 talloc_free(rev_hdl->fde);
1635 if (rev_hdl->fd[0] != -1) {
1636 close(rev_hdl->fd[0]);
1638 if (rev_hdl->fd[1] != -1) {
1639 close(rev_hdl->fd[1]);
1641 ctdb_kill(rev_hdl->ctdb, rev_hdl->child, SIGKILL);
1643 DLIST_REMOVE(rev_hdl->ctdb_db->revokechild_active, rev_hdl);
1645 while (rev_hdl->deferred_call_list != NULL) {
1646 struct revokechild_deferred_call *dcall;
1648 dcall = rev_hdl->deferred_call_list;
1649 DLIST_REMOVE(rev_hdl->deferred_call_list, dcall);
1651 /* If revoke is successful, then first process all the calls
1652 * that need write access, and delay readonly requests by 1
1653 * second grace.
1655 * If revoke is unsuccessful, most likely because of node
1656 * failure, delay all the pending requests, so database can
1657 * be recovered.
1660 if (rev_hdl->status == 0) {
1661 struct ctdb_req_call_old *c;
1663 c = (struct ctdb_req_call_old *)dcall->hdr;
1664 if (c->flags & CTDB_WANT_READONLY) {
1665 DLIST_ADD(delay_list, dcall);
1666 } else {
1667 DLIST_ADD(now_list, dcall);
1669 } else {
1670 DLIST_ADD(delay_list, dcall);
1674 if (now_list != NULL) {
1675 tevent_add_timer(rev_hdl->ctdb->ev,
1676 rev_hdl->ctdb_db,
1677 tevent_timeval_current_ofs(0, 0),
1678 deferred_call_requeue,
1679 now_list);
1682 if (delay_list != NULL) {
1683 tevent_add_timer(rev_hdl->ctdb->ev,
1684 rev_hdl->ctdb_db,
1685 tevent_timeval_current_ofs(1, 0),
1686 deferred_call_requeue,
1687 delay_list);
1690 return 0;
1693 static void revokechild_handler(struct tevent_context *ev,
1694 struct tevent_fd *fde,
1695 uint16_t flags, void *private_data)
1697 struct revokechild_handle *rev_hdl =
1698 talloc_get_type(private_data, struct revokechild_handle);
1699 int ret;
1700 char c;
1702 ret = sys_read(rev_hdl->fd[0], &c, 1);
1703 if (ret != 1) {
1704 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1705 rev_hdl->status = -1;
1706 talloc_free(rev_hdl);
1707 return;
1709 if (c != 0) {
1710 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1711 rev_hdl->status = -1;
1712 talloc_free(rev_hdl);
1713 return;
1716 talloc_free(rev_hdl);
1719 struct ctdb_revoke_state {
1720 struct ctdb_db_context *ctdb_db;
1721 TDB_DATA key;
1722 struct ctdb_ltdb_header *header;
1723 TDB_DATA data;
1724 int count;
1725 int status;
1726 int finished;
1729 static void update_record_cb(struct ctdb_client_control_state *state)
1731 struct ctdb_revoke_state *revoke_state;
1732 int ret;
1733 int32_t res;
1735 if (state == NULL) {
1736 return;
1738 revoke_state = state->async.private_data;
1740 state->async.fn = NULL;
1741 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1742 if ((ret != 0) || (res != 0)) {
1743 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1744 revoke_state->status = -1;
1747 revoke_state->count--;
1748 if (revoke_state->count <= 0) {
1749 revoke_state->finished = 1;
1753 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1755 struct ctdb_revoke_state *revoke_state = private_data;
1756 struct ctdb_client_control_state *state;
1758 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1759 if (state == NULL) {
1760 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1761 revoke_state->status = -1;
1762 return;
1764 state->async.fn = update_record_cb;
1765 state->async.private_data = revoke_state;
1767 revoke_state->count++;
1771 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1772 struct tevent_timer *te,
1773 struct timeval yt, void *private_data)
1775 struct ctdb_revoke_state *state = private_data;
1777 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1778 state->finished = 1;
1779 state->status = -1;
1782 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1784 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1785 struct ctdb_ltdb_header new_header;
1786 TDB_DATA new_data;
1788 state->ctdb_db = ctdb_db;
1789 state->key = key;
1790 state->header = header;
1791 state->data = data;
1793 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1795 tevent_add_timer(ctdb->ev, state,
1796 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1797 ctdb_revoke_timeout_handler, state);
1799 while (state->finished == 0) {
1800 tevent_loop_once(ctdb->ev);
1803 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1804 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1805 talloc_free(state);
1806 return -1;
1808 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1809 ctdb_ltdb_unlock(ctdb_db, key);
1810 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1811 talloc_free(state);
1812 return -1;
1814 header->rsn++;
1815 if (new_header.rsn > header->rsn) {
1816 ctdb_ltdb_unlock(ctdb_db, key);
1817 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1818 talloc_free(state);
1819 return -1;
1821 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1822 ctdb_ltdb_unlock(ctdb_db, key);
1823 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1824 talloc_free(state);
1825 return -1;
1829 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1830 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1832 if (state->status == 0) {
1833 new_header.rsn++;
1834 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1835 } else {
1836 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1837 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1839 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1840 ctdb_ltdb_unlock(ctdb_db, key);
1841 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1842 talloc_free(state);
1843 return -1;
1845 ctdb_ltdb_unlock(ctdb_db, key);
1847 talloc_free(state);
1848 return 0;
1852 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb,
1853 struct ctdb_db_context *ctdb_db,
1854 TDB_DATA key,
1855 struct ctdb_ltdb_header *header,
1856 TDB_DATA data)
1858 TDB_DATA tdata;
1859 struct revokechild_handle *rev_hdl;
1860 pid_t parent = getpid();
1861 int ret;
1863 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY |
1864 CTDB_REC_RO_HAVE_DELEGATIONS |
1865 CTDB_REC_RO_HAVE_READONLY);
1867 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1868 header->rsn -= 1;
1870 rev_hdl = talloc_zero(ctdb_db, struct revokechild_handle);
1871 if (rev_hdl == NULL) {
1872 D_ERR("Failed to allocate revokechild_handle\n");
1873 return -1;
1876 tdata = tdb_fetch(ctdb_db->rottdb, key);
1877 if (tdata.dsize > 0) {
1878 uint8_t *tmp;
1880 tmp = tdata.dptr;
1881 tdata.dptr = talloc_memdup(rev_hdl, tdata.dptr, tdata.dsize);
1882 free(tmp);
1885 rev_hdl->status = 0;
1886 rev_hdl->ctdb = ctdb;
1887 rev_hdl->ctdb_db = ctdb_db;
1888 rev_hdl->fd[0] = -1;
1889 rev_hdl->fd[1] = -1;
1891 rev_hdl->key.dsize = key.dsize;
1892 rev_hdl->key.dptr = talloc_memdup(rev_hdl, key.dptr, key.dsize);
1893 if (rev_hdl->key.dptr == NULL) {
1894 D_ERR("Failed to allocate key for revokechild_handle\n");
1895 goto err_out;
1898 ret = pipe(rev_hdl->fd);
1899 if (ret != 0) {
1900 D_ERR("Failed to allocate key for revokechild_handle\n");
1901 goto err_out;
1905 rev_hdl->child = ctdb_fork(ctdb);
1906 if (rev_hdl->child == (pid_t)-1) {
1907 D_ERR("Failed to fork child for revokechild\n");
1908 goto err_out;
1911 if (rev_hdl->child == 0) {
1912 char c = 0;
1913 close(rev_hdl->fd[0]);
1915 prctl_set_comment("ctdb_revokechild");
1916 if (switch_from_server_to_client(ctdb) != 0) {
1917 D_ERR("Failed to switch from server to client "
1918 "for revokechild process\n");
1919 c = 1;
1920 goto child_finished;
1923 c = ctdb_revoke_all_delegations(ctdb,
1924 ctdb_db,
1925 tdata,
1926 key,
1927 header,
1928 data);
1930 child_finished:
1931 sys_write(rev_hdl->fd[1], &c, 1);
1932 ctdb_wait_for_process_to_exit(parent);
1933 _exit(0);
1936 close(rev_hdl->fd[1]);
1937 rev_hdl->fd[1] = -1;
1938 set_close_on_exec(rev_hdl->fd[0]);
1940 rev_hdl->fde = tevent_add_fd(ctdb->ev,
1941 rev_hdl,
1942 rev_hdl->fd[0],
1943 TEVENT_FD_READ,
1944 revokechild_handler,
1945 (void *)rev_hdl);
1947 if (rev_hdl->fde == NULL) {
1948 D_ERR("Failed to set up fd event for revokechild process\n");
1949 talloc_free(rev_hdl);
1951 tevent_fd_set_auto_close(rev_hdl->fde);
1953 /* This is an active revokechild child process */
1954 DLIST_ADD_END(ctdb_db->revokechild_active, rev_hdl);
1955 talloc_set_destructor(rev_hdl, revokechild_destructor);
1957 return 0;
1958 err_out:
1959 talloc_free(rev_hdl);
1960 return -1;
1963 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1965 struct revokechild_handle *rev_hdl;
1966 struct revokechild_deferred_call *deferred_call;
1968 for (rev_hdl = ctdb_db->revokechild_active;
1969 rev_hdl;
1970 rev_hdl = rev_hdl->next) {
1971 if (rev_hdl->key.dsize == 0) {
1972 continue;
1974 if (rev_hdl->key.dsize != key.dsize) {
1975 continue;
1977 if (!memcmp(rev_hdl->key.dptr, key.dptr, key.dsize)) {
1978 break;
1982 if (rev_hdl == NULL) {
1983 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1984 return -1;
1987 deferred_call = talloc(call_context, struct revokechild_deferred_call);
1988 if (deferred_call == NULL) {
1989 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1990 return -1;
1993 deferred_call->ctdb = ctdb;
1994 deferred_call->hdr = talloc_steal(deferred_call, hdr);
1995 deferred_call->fn = fn;
1996 deferred_call->ctx = call_context;
1997 deferred_call->rev_hdl = rev_hdl;
1999 talloc_set_destructor(deferred_call, deferred_call_destructor);
2001 DLIST_ADD(rev_hdl->deferred_call_list, deferred_call);
2003 return 0;
2006 static void ctdb_migration_count_handler(TDB_DATA key, uint64_t counter,
2007 void *private_data)
2009 struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
2010 private_data, struct ctdb_db_context);
2011 unsigned int value;
2013 value = (counter < INT_MAX ? counter : INT_MAX);
2014 ctdb_update_db_stat_hot_keys(ctdb_db, key, value);
2017 static void ctdb_migration_cleandb_event(struct tevent_context *ev,
2018 struct tevent_timer *te,
2019 struct timeval current_time,
2020 void *private_data)
2022 struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
2023 private_data, struct ctdb_db_context);
2025 if (ctdb_db->migratedb == NULL) {
2026 return;
2029 hash_count_expire(ctdb_db->migratedb, NULL);
2031 te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
2032 tevent_timeval_current_ofs(10, 0),
2033 ctdb_migration_cleandb_event, ctdb_db);
2034 if (te == NULL) {
2035 DEBUG(DEBUG_ERR,
2036 ("Memory error in migration cleandb event for %s\n",
2037 ctdb_db->db_name));
2038 TALLOC_FREE(ctdb_db->migratedb);
2042 int ctdb_migration_init(struct ctdb_db_context *ctdb_db)
2044 struct timeval one_second = { 1, 0 };
2045 struct tevent_timer *te;
2046 int ret;
2048 if (! ctdb_db_volatile(ctdb_db)) {
2049 return 0;
2052 ret = hash_count_init(ctdb_db, one_second,
2053 ctdb_migration_count_handler, ctdb_db,
2054 &ctdb_db->migratedb);
2055 if (ret != 0) {
2056 DEBUG(DEBUG_ERR,
2057 ("Memory error in migration init for %s\n",
2058 ctdb_db->db_name));
2059 return -1;
2062 te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
2063 tevent_timeval_current_ofs(10, 0),
2064 ctdb_migration_cleandb_event, ctdb_db);
2065 if (te == NULL) {
2066 DEBUG(DEBUG_ERR,
2067 ("Memory error in migration init for %s\n",
2068 ctdb_db->db_name));
2069 TALLOC_FREE(ctdb_db->migratedb);
2070 return -1;
2073 return 0;