s3: VFS: vfs_snapper: Make chflags return errno = EROFS on a shadow copy path.
[Samba.git] / ctdb / server / ctdb_call.c
blob346fe89010e59d36572106616ffa6cf351746787
1 /*
2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/sys_rw.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/rb_tree.h"
40 #include "common/reqid.h"
41 #include "common/system.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44 #include "common/hash_count.h"
46 struct ctdb_sticky_record {
47 struct ctdb_context *ctdb;
48 struct ctdb_db_context *ctdb_db;
49 TDB_CONTEXT *pindown;
53 find the ctdb_db from a db index
55 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
57 struct ctdb_db_context *ctdb_db;
59 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
60 if (ctdb_db->db_id == id) {
61 break;
64 return ctdb_db;
68 a varient of input packet that can be used in lock requeue
70 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
72 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
73 ctdb_input_pkt(ctdb, hdr);
78 send an error reply
80 static void ctdb_send_error(struct ctdb_context *ctdb,
81 struct ctdb_req_header *hdr, uint32_t status,
82 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
83 static void ctdb_send_error(struct ctdb_context *ctdb,
84 struct ctdb_req_header *hdr, uint32_t status,
85 const char *fmt, ...)
87 va_list ap;
88 struct ctdb_reply_error_old *r;
89 char *msg;
90 int msglen, len;
92 if (ctdb->methods == NULL) {
93 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
94 return;
97 va_start(ap, fmt);
98 msg = talloc_vasprintf(ctdb, fmt, ap);
99 if (msg == NULL) {
100 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
102 va_end(ap);
104 msglen = strlen(msg)+1;
105 len = offsetof(struct ctdb_reply_error_old, msg);
106 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
107 struct ctdb_reply_error_old);
108 CTDB_NO_MEMORY_FATAL(ctdb, r);
110 r->hdr.destnode = hdr->srcnode;
111 r->hdr.reqid = hdr->reqid;
112 r->status = status;
113 r->msglen = msglen;
114 memcpy(&r->msg[0], msg, msglen);
116 ctdb_queue_packet(ctdb, &r->hdr);
118 talloc_free(msg);
123 * send a redirect reply
125 * The logic behind this function is this:
127 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
128 * to its local ctdb (ctdb_request_call). If the node is not itself
129 * the record's DMASTER, it first redirects the packet to the
130 * record's LMASTER. The LMASTER then redirects the call packet to
131 * the current DMASTER. Note that this works because of this: When
132 * a record is migrated off a node, then the new DMASTER is stored
133 * in the record's copy on the former DMASTER.
135 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
136 struct ctdb_db_context *ctdb_db,
137 TDB_DATA key,
138 struct ctdb_req_call_old *c,
139 struct ctdb_ltdb_header *header)
141 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
143 c->hdr.destnode = lmaster;
144 if (ctdb->pnn == lmaster) {
145 c->hdr.destnode = header->dmaster;
147 c->hopcount++;
149 if (c->hopcount%100 > 95) {
150 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
151 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
152 "header->dmaster:%d dst:%d\n",
153 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
154 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
155 header->dmaster, c->hdr.destnode));
158 ctdb_queue_packet(ctdb, &c->hdr);
163 send a dmaster reply
165 caller must have the chainlock before calling this routine. Caller must be
166 the lmaster
168 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
169 struct ctdb_ltdb_header *header,
170 TDB_DATA key, TDB_DATA data,
171 uint32_t new_dmaster,
172 uint32_t reqid)
174 struct ctdb_context *ctdb = ctdb_db->ctdb;
175 struct ctdb_reply_dmaster_old *r;
176 int ret, len;
177 TALLOC_CTX *tmp_ctx;
179 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
180 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
181 return;
184 header->dmaster = new_dmaster;
185 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
186 if (ret != 0) {
187 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
188 return;
191 if (ctdb->methods == NULL) {
192 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
193 return;
196 /* put the packet on a temporary context, allowing us to safely free
197 it below even if ctdb_reply_dmaster() has freed it already */
198 tmp_ctx = talloc_new(ctdb);
200 /* send the CTDB_REPLY_DMASTER */
201 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
202 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
203 struct ctdb_reply_dmaster_old);
204 CTDB_NO_MEMORY_FATAL(ctdb, r);
206 r->hdr.destnode = new_dmaster;
207 r->hdr.reqid = reqid;
208 r->hdr.generation = ctdb_db->generation;
209 r->rsn = header->rsn;
210 r->keylen = key.dsize;
211 r->datalen = data.dsize;
212 r->db_id = ctdb_db->db_id;
213 memcpy(&r->data[0], key.dptr, key.dsize);
214 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
215 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
217 ctdb_queue_packet(ctdb, &r->hdr);
219 talloc_free(tmp_ctx);
223 send a dmaster request (give another node the dmaster for a record)
225 This is always sent to the lmaster, which ensures that the lmaster
226 always knows who the dmaster is. The lmaster will then send a
227 CTDB_REPLY_DMASTER to the new dmaster
229 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
230 struct ctdb_req_call_old *c,
231 struct ctdb_ltdb_header *header,
232 TDB_DATA *key, TDB_DATA *data)
234 struct ctdb_req_dmaster_old *r;
235 struct ctdb_context *ctdb = ctdb_db->ctdb;
236 int len;
237 uint32_t lmaster = ctdb_lmaster(ctdb, key);
239 if (ctdb->methods == NULL) {
240 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
241 return;
244 if (data->dsize != 0) {
245 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
248 if (lmaster == ctdb->pnn) {
249 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
250 c->hdr.srcnode, c->hdr.reqid);
251 return;
254 len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
255 + sizeof(uint32_t);
256 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
257 struct ctdb_req_dmaster_old);
258 CTDB_NO_MEMORY_FATAL(ctdb, r);
259 r->hdr.destnode = lmaster;
260 r->hdr.reqid = c->hdr.reqid;
261 r->hdr.generation = ctdb_db->generation;
262 r->db_id = c->db_id;
263 r->rsn = header->rsn;
264 r->dmaster = c->hdr.srcnode;
265 r->keylen = key->dsize;
266 r->datalen = data->dsize;
267 memcpy(&r->data[0], key->dptr, key->dsize);
268 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
269 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
271 header->dmaster = c->hdr.srcnode;
272 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
273 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
276 ctdb_queue_packet(ctdb, &r->hdr);
278 talloc_free(r);
281 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
282 struct tevent_timer *te,
283 struct timeval t, void *private_data)
285 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
286 struct ctdb_sticky_record);
288 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
289 if (sr->pindown != NULL) {
290 talloc_free(sr->pindown);
291 sr->pindown = NULL;
295 static int
296 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
298 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
299 uint32_t *k;
300 struct ctdb_sticky_record *sr;
302 k = ctdb_key_to_idkey(tmp_ctx, key);
303 if (k == NULL) {
304 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
305 talloc_free(tmp_ctx);
306 return -1;
309 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
310 if (sr == NULL) {
311 talloc_free(tmp_ctx);
312 return 0;
315 talloc_free(tmp_ctx);
317 if (sr->pindown == NULL) {
318 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
319 sr->pindown = talloc_new(sr);
320 if (sr->pindown == NULL) {
321 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
322 return -1;
324 tevent_add_timer(ctdb->ev, sr->pindown,
325 timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
326 (ctdb->tunable.sticky_pindown * 1000) % 1000000),
327 ctdb_sticky_pindown_timeout, sr);
330 return 0;
334 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
335 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
337 must be called with the chainlock held. This function releases the chainlock
339 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
340 struct ctdb_req_header *hdr,
341 TDB_DATA key, TDB_DATA data,
342 uint64_t rsn, uint32_t record_flags)
344 struct ctdb_call_state *state;
345 struct ctdb_context *ctdb = ctdb_db->ctdb;
346 struct ctdb_ltdb_header header;
347 int ret;
349 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
351 ZERO_STRUCT(header);
352 header.rsn = rsn;
353 header.dmaster = ctdb->pnn;
354 header.flags = record_flags;
356 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
358 if (state) {
359 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
361 * We temporarily add the VACUUM_MIGRATED flag to
362 * the record flags, so that ctdb_ltdb_store can
363 * decide whether the record should be stored or
364 * deleted.
366 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
370 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
371 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
373 ret = ctdb_ltdb_unlock(ctdb_db, key);
374 if (ret != 0) {
375 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
377 return;
380 /* we just became DMASTER and this database is "sticky",
381 see if the record is flagged as "hot" and set up a pin-down
382 context to stop migrations for a little while if so
384 if (ctdb_db_sticky(ctdb_db)) {
385 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
388 if (state == NULL) {
389 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
390 ctdb->pnn, hdr->reqid, hdr->srcnode));
392 ret = ctdb_ltdb_unlock(ctdb_db, key);
393 if (ret != 0) {
394 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
396 return;
399 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
400 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
402 ret = ctdb_ltdb_unlock(ctdb_db, key);
403 if (ret != 0) {
404 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
406 return;
409 if (hdr->reqid != state->reqid) {
410 /* we found a record but it was the wrong one */
411 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
413 ret = ctdb_ltdb_unlock(ctdb_db, key);
414 if (ret != 0) {
415 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
417 return;
420 (void) hash_count_increment(ctdb_db->migratedb, key);
422 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
424 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
425 if (ret != 0) {
426 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
429 state->state = CTDB_CALL_DONE;
430 if (state->async.fn) {
431 state->async.fn(state);
435 struct dmaster_defer_call {
436 struct dmaster_defer_call *next, *prev;
437 struct ctdb_context *ctdb;
438 struct ctdb_req_header *hdr;
441 struct dmaster_defer_queue {
442 struct ctdb_db_context *ctdb_db;
443 uint32_t generation;
444 struct dmaster_defer_call *deferred_calls;
447 static void dmaster_defer_reprocess(struct tevent_context *ev,
448 struct tevent_timer *te,
449 struct timeval t,
450 void *private_data)
452 struct dmaster_defer_call *call = talloc_get_type(
453 private_data, struct dmaster_defer_call);
455 ctdb_input_pkt(call->ctdb, call->hdr);
456 talloc_free(call);
459 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
461 /* Ignore requests, if database recovery happens in-between. */
462 if (ddq->generation != ddq->ctdb_db->generation) {
463 return 0;
466 while (ddq->deferred_calls != NULL) {
467 struct dmaster_defer_call *call = ddq->deferred_calls;
469 DLIST_REMOVE(ddq->deferred_calls, call);
471 talloc_steal(call->ctdb, call);
472 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
473 dmaster_defer_reprocess, call);
475 return 0;
478 static void *insert_ddq_callback(void *parm, void *data)
480 if (data) {
481 talloc_free(data);
483 return parm;
487 * This function is used to reigster a key in database that needs to be updated.
488 * Any requests for that key should get deferred till this is completed.
490 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
491 struct ctdb_req_header *hdr,
492 TDB_DATA key)
494 uint32_t *k;
495 struct dmaster_defer_queue *ddq;
497 k = ctdb_key_to_idkey(hdr, key);
498 if (k == NULL) {
499 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
500 return -1;
503 /* Already exists */
504 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
505 if (ddq != NULL) {
506 if (ddq->generation == ctdb_db->generation) {
507 talloc_free(k);
508 return 0;
511 /* Recovery ocurred - get rid of old queue. All the deferred
512 * requests will be resent anyway from ctdb_call_resend_db.
514 talloc_free(ddq);
517 ddq = talloc(hdr, struct dmaster_defer_queue);
518 if (ddq == NULL) {
519 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
520 talloc_free(k);
521 return -1;
523 ddq->ctdb_db = ctdb_db;
524 ddq->generation = hdr->generation;
525 ddq->deferred_calls = NULL;
527 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
528 insert_ddq_callback, ddq);
529 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
531 talloc_free(k);
532 return 0;
535 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
536 struct ctdb_req_header *hdr,
537 TDB_DATA key)
539 struct dmaster_defer_queue *ddq;
540 struct dmaster_defer_call *call;
541 uint32_t *k;
543 k = ctdb_key_to_idkey(hdr, key);
544 if (k == NULL) {
545 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
546 return -1;
549 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
550 if (ddq == NULL) {
551 talloc_free(k);
552 return -1;
555 talloc_free(k);
557 if (ddq->generation != hdr->generation) {
558 talloc_set_destructor(ddq, NULL);
559 talloc_free(ddq);
560 return -1;
563 call = talloc(ddq, struct dmaster_defer_call);
564 if (call == NULL) {
565 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
566 return -1;
569 call->ctdb = ctdb_db->ctdb;
570 call->hdr = talloc_steal(call, hdr);
572 DLIST_ADD_END(ddq->deferred_calls, call);
574 return 0;
578 called when a CTDB_REQ_DMASTER packet comes in
580 this comes into the lmaster for a record when the current dmaster
581 wants to give up the dmaster role and give it to someone else
583 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
585 struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
586 TDB_DATA key, data, data2;
587 struct ctdb_ltdb_header header;
588 struct ctdb_db_context *ctdb_db;
589 uint32_t record_flags = 0;
590 size_t len;
591 int ret;
593 key.dptr = c->data;
594 key.dsize = c->keylen;
595 data.dptr = c->data + c->keylen;
596 data.dsize = c->datalen;
597 len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
598 + sizeof(uint32_t);
599 if (len <= c->hdr.length) {
600 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
601 sizeof(record_flags));
604 ctdb_db = find_ctdb_db(ctdb, c->db_id);
605 if (!ctdb_db) {
606 ctdb_send_error(ctdb, hdr, -1,
607 "Unknown database in request. db_id==0x%08x",
608 c->db_id);
609 return;
612 dmaster_defer_setup(ctdb_db, hdr, key);
614 /* fetch the current record */
615 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
616 ctdb_call_input_pkt, ctdb, false);
617 if (ret == -1) {
618 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
619 return;
621 if (ret == -2) {
622 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
623 return;
626 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
627 DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
628 "db=%s lmaster=%u gen=%u curgen=%u\n",
629 ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
630 hdr->generation, ctdb_db->generation));
631 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
634 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
635 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
637 /* its a protocol error if the sending node is not the current dmaster */
638 if (header.dmaster != hdr->srcnode) {
639 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
640 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
641 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
642 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
643 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
644 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
645 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
647 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
648 ctdb_ltdb_unlock(ctdb_db, key);
649 return;
653 if (header.rsn > c->rsn) {
654 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
655 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
656 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
657 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
660 /* use the rsn from the sending node */
661 header.rsn = c->rsn;
663 /* store the record flags from the sending node */
664 header.flags = record_flags;
666 /* check if the new dmaster is the lmaster, in which case we
667 skip the dmaster reply */
668 if (c->dmaster == ctdb->pnn) {
669 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
670 } else {
671 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
673 ret = ctdb_ltdb_unlock(ctdb_db, key);
674 if (ret != 0) {
675 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
680 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
681 struct tevent_timer *te,
682 struct timeval t, void *private_data)
684 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
685 struct ctdb_sticky_record);
686 talloc_free(sr);
689 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
691 if (data) {
692 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
693 talloc_free(data);
695 return parm;
698 static int
699 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
701 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
702 uint32_t *k;
703 struct ctdb_sticky_record *sr;
705 k = ctdb_key_to_idkey(tmp_ctx, key);
706 if (k == NULL) {
707 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
708 talloc_free(tmp_ctx);
709 return -1;
712 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
713 if (sr != NULL) {
714 talloc_free(tmp_ctx);
715 return 0;
718 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
719 if (sr == NULL) {
720 talloc_free(tmp_ctx);
721 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
722 return -1;
725 sr->ctdb = ctdb;
726 sr->ctdb_db = ctdb_db;
727 sr->pindown = NULL;
729 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
730 ctdb->tunable.sticky_duration,
731 ctdb_db->db_name, ctdb_hash(&key)));
733 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
735 tevent_add_timer(ctdb->ev, sr,
736 timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
737 ctdb_sticky_record_timeout, sr);
739 talloc_free(tmp_ctx);
740 return 0;
743 struct pinned_down_requeue_handle {
744 struct ctdb_context *ctdb;
745 struct ctdb_req_header *hdr;
748 struct pinned_down_deferred_call {
749 struct ctdb_context *ctdb;
750 struct ctdb_req_header *hdr;
753 static void pinned_down_requeue(struct tevent_context *ev,
754 struct tevent_timer *te,
755 struct timeval t, void *private_data)
757 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
758 struct ctdb_context *ctdb = handle->ctdb;
760 talloc_steal(ctdb, handle->hdr);
761 ctdb_call_input_pkt(ctdb, handle->hdr);
763 talloc_free(handle);
766 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
768 struct ctdb_context *ctdb = pinned_down->ctdb;
769 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
771 handle->ctdb = pinned_down->ctdb;
772 handle->hdr = pinned_down->hdr;
773 talloc_steal(handle, handle->hdr);
775 tevent_add_timer(ctdb->ev, handle, timeval_zero(),
776 pinned_down_requeue, handle);
778 return 0;
781 static int
782 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
784 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
785 uint32_t *k;
786 struct ctdb_sticky_record *sr;
787 struct pinned_down_deferred_call *pinned_down;
789 k = ctdb_key_to_idkey(tmp_ctx, key);
790 if (k == NULL) {
791 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
792 talloc_free(tmp_ctx);
793 return -1;
796 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
797 if (sr == NULL) {
798 talloc_free(tmp_ctx);
799 return -1;
802 talloc_free(tmp_ctx);
804 if (sr->pindown == NULL) {
805 return -1;
808 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
809 if (pinned_down == NULL) {
810 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
811 return -1;
814 pinned_down->ctdb = ctdb;
815 pinned_down->hdr = hdr;
817 talloc_set_destructor(pinned_down, pinned_down_destructor);
818 talloc_steal(pinned_down, hdr);
820 return 0;
823 static void
824 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key,
825 int count)
827 int i, id;
828 char *keystr;
830 /* smallest value is always at index 0 */
831 if (count <= ctdb_db->statistics.hot_keys[0].count) {
832 return;
835 /* see if we already know this key */
836 for (i = 0; i < MAX_HOT_KEYS; i++) {
837 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
838 continue;
840 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
841 continue;
843 /* found an entry for this key */
844 if (count <= ctdb_db->statistics.hot_keys[i].count) {
845 return;
847 ctdb_db->statistics.hot_keys[i].count = count;
848 goto sort_keys;
851 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
852 id = ctdb_db->statistics.num_hot_keys;
853 ctdb_db->statistics.num_hot_keys++;
854 } else {
855 id = 0;
858 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
859 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
861 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
862 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
863 ctdb_db->statistics.hot_keys[id].count = count;
865 keystr = hex_encode_talloc(ctdb_db,
866 (unsigned char *)key.dptr, key.dsize);
867 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=%s id=%d "
868 "count=%d\n", ctdb_db->db_name,
869 keystr ? keystr : "" , id, count));
870 talloc_free(keystr);
872 sort_keys:
873 for (i = 1; i < MAX_HOT_KEYS; i++) {
874 if (ctdb_db->statistics.hot_keys[i].count == 0) {
875 continue;
877 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
878 count = ctdb_db->statistics.hot_keys[i].count;
879 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
880 ctdb_db->statistics.hot_keys[0].count = count;
882 key = ctdb_db->statistics.hot_keys[i].key;
883 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
884 ctdb_db->statistics.hot_keys[0].key = key;
890 called when a CTDB_REQ_CALL packet comes in
892 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
894 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
895 TDB_DATA data;
896 struct ctdb_reply_call_old *r;
897 int ret, len;
898 struct ctdb_ltdb_header header;
899 struct ctdb_call *call;
900 struct ctdb_db_context *ctdb_db;
901 int tmp_count, bucket;
903 if (ctdb->methods == NULL) {
904 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
905 return;
909 ctdb_db = find_ctdb_db(ctdb, c->db_id);
910 if (!ctdb_db) {
911 ctdb_send_error(ctdb, hdr, -1,
912 "Unknown database in request. db_id==0x%08x",
913 c->db_id);
914 return;
917 call = talloc(hdr, struct ctdb_call);
918 CTDB_NO_MEMORY_FATAL(ctdb, call);
920 call->call_id = c->callid;
921 call->key.dptr = c->data;
922 call->key.dsize = c->keylen;
923 call->call_data.dptr = c->data + c->keylen;
924 call->call_data.dsize = c->calldatalen;
925 call->reply_data.dptr = NULL;
926 call->reply_data.dsize = 0;
929 /* If this record is pinned down we should defer the
930 request until the pindown times out
932 if (ctdb_db_sticky(ctdb_db)) {
933 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
934 DEBUG(DEBUG_WARNING,
935 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
936 talloc_free(call);
937 return;
941 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
942 talloc_free(call);
943 return;
946 /* determine if we are the dmaster for this key. This also
947 fetches the record data (if any), thus avoiding a 2nd fetch of the data
948 if the call will be answered locally */
950 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
951 ctdb_call_input_pkt, ctdb, false);
952 if (ret == -1) {
953 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
954 talloc_free(call);
955 return;
957 if (ret == -2) {
958 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
959 talloc_free(call);
960 return;
963 /* Dont do READONLY if we don't have a tracking database */
964 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
965 c->flags &= ~CTDB_WANT_READONLY;
968 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
969 header.flags &= ~CTDB_REC_RO_FLAGS;
970 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
971 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
972 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
973 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
975 /* and clear out the tracking data */
976 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
977 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
981 /* if we are revoking, we must defer all other calls until the revoke
982 * had completed.
984 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
985 talloc_free(data.dptr);
986 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
988 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
989 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
991 talloc_free(call);
992 return;
996 * If we are not the dmaster and are not hosting any delegations,
997 * then we redirect the request to the node than can answer it
998 * (the lmaster or the dmaster).
1000 if ((header.dmaster != ctdb->pnn)
1001 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
1002 talloc_free(data.dptr);
1003 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
1005 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1006 if (ret != 0) {
1007 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1009 talloc_free(call);
1010 return;
1013 if ( (!(c->flags & CTDB_WANT_READONLY))
1014 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1015 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1016 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1017 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1019 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1021 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1022 ctdb_fatal(ctdb, "Failed to start record revoke");
1024 talloc_free(data.dptr);
1026 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1027 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1029 talloc_free(call);
1031 return;
1034 /* If this is the first request for delegation. bump rsn and set
1035 * the delegations flag
1037 if ((c->flags & CTDB_WANT_READONLY)
1038 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1039 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1040 header.rsn += 3;
1041 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1042 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1043 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1046 if ((c->flags & CTDB_WANT_READONLY)
1047 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1048 TDB_DATA tdata;
1050 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1051 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1052 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1054 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1055 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1057 free(tdata.dptr);
1059 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1060 if (ret != 0) {
1061 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1064 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1065 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1066 struct ctdb_reply_call_old);
1067 CTDB_NO_MEMORY_FATAL(ctdb, r);
1068 r->hdr.destnode = c->hdr.srcnode;
1069 r->hdr.reqid = c->hdr.reqid;
1070 r->hdr.generation = ctdb_db->generation;
1071 r->status = 0;
1072 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1073 header.rsn -= 2;
1074 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1075 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1076 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1078 if (data.dsize) {
1079 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1082 ctdb_queue_packet(ctdb, &r->hdr);
1083 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1084 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1086 talloc_free(r);
1087 talloc_free(call);
1088 return;
1091 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1092 tmp_count = c->hopcount;
1093 bucket = 0;
1094 while (tmp_count) {
1095 tmp_count >>= 1;
1096 bucket++;
1098 if (bucket >= MAX_COUNT_BUCKETS) {
1099 bucket = MAX_COUNT_BUCKETS - 1;
1101 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1102 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1104 /* If this database supports sticky records, then check if the
1105 hopcount is big. If it is it means the record is hot and we
1106 should make it sticky.
1108 if (ctdb_db_sticky(ctdb_db) &&
1109 c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1110 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1114 /* Try if possible to migrate the record off to the caller node.
1115 * From the clients perspective a fetch of the data is just as
1116 * expensive as a migration.
1118 if (c->hdr.srcnode != ctdb->pnn) {
1119 if (ctdb_db->persistent_state) {
1120 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1121 " of key %s while transaction is active\n",
1122 (char *)call->key.dptr));
1123 } else {
1124 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1125 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1126 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1127 talloc_free(data.dptr);
1129 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1130 if (ret != 0) {
1131 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1134 talloc_free(call);
1135 return;
1138 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1139 if (ret != 0) {
1140 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1141 call->status = -1;
1144 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1145 if (ret != 0) {
1146 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1149 len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1150 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1151 struct ctdb_reply_call_old);
1152 CTDB_NO_MEMORY_FATAL(ctdb, r);
1153 r->hdr.destnode = hdr->srcnode;
1154 r->hdr.reqid = hdr->reqid;
1155 r->hdr.generation = ctdb_db->generation;
1156 r->status = call->status;
1157 r->datalen = call->reply_data.dsize;
1158 if (call->reply_data.dsize) {
1159 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1162 ctdb_queue_packet(ctdb, &r->hdr);
1164 talloc_free(r);
1165 talloc_free(call);
1169 * called when a CTDB_REPLY_CALL packet comes in
1171 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1172 * contains any reply data from the call
1174 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1176 struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1177 struct ctdb_call_state *state;
1179 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1180 if (state == NULL) {
1181 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1182 return;
1185 if (hdr->reqid != state->reqid) {
1186 /* we found a record but it was the wrong one */
1187 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1188 return;
1192 /* read only delegation processing */
1193 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1194 * delegation since we may need to update the record header
1196 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1197 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1198 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1199 struct ctdb_ltdb_header oldheader;
1200 TDB_DATA key, data, olddata;
1201 int ret;
1203 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1204 goto finished_ro;
1205 return;
1208 key.dsize = state->c->keylen;
1209 key.dptr = state->c->data;
1210 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1211 ctdb_call_input_pkt, ctdb, false);
1212 if (ret == -2) {
1213 return;
1215 if (ret != 0) {
1216 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1217 return;
1220 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1221 if (ret != 0) {
1222 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1223 ctdb_ltdb_unlock(ctdb_db, key);
1224 goto finished_ro;
1227 if (header->rsn <= oldheader.rsn) {
1228 ctdb_ltdb_unlock(ctdb_db, key);
1229 goto finished_ro;
1232 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1233 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1234 ctdb_ltdb_unlock(ctdb_db, key);
1235 goto finished_ro;
1238 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1239 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1240 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1241 if (ret != 0) {
1242 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1243 ctdb_ltdb_unlock(ctdb_db, key);
1244 goto finished_ro;
1247 ctdb_ltdb_unlock(ctdb_db, key);
1249 finished_ro:
1251 state->call->reply_data.dptr = c->data;
1252 state->call->reply_data.dsize = c->datalen;
1253 state->call->status = c->status;
1255 talloc_steal(state, c);
1257 state->state = CTDB_CALL_DONE;
1258 if (state->async.fn) {
1259 state->async.fn(state);
1265 * called when a CTDB_REPLY_DMASTER packet comes in
1267 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1268 * request packet. It means that the current dmaster wants to give us
1269 * the dmaster role.
1271 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1273 struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1274 struct ctdb_db_context *ctdb_db;
1275 TDB_DATA key, data;
1276 uint32_t record_flags = 0;
1277 size_t len;
1278 int ret;
1280 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1281 if (ctdb_db == NULL) {
1282 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1283 return;
1286 key.dptr = c->data;
1287 key.dsize = c->keylen;
1288 data.dptr = &c->data[key.dsize];
1289 data.dsize = c->datalen;
1290 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1291 + sizeof(uint32_t);
1292 if (len <= c->hdr.length) {
1293 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1294 sizeof(record_flags));
1297 dmaster_defer_setup(ctdb_db, hdr, key);
1299 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1300 ctdb_call_input_pkt, ctdb, false);
1301 if (ret == -2) {
1302 return;
1304 if (ret != 0) {
1305 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1306 return;
1309 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1314 called when a CTDB_REPLY_ERROR packet comes in
1316 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1318 struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1319 struct ctdb_call_state *state;
1321 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1322 if (state == NULL) {
1323 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1324 ctdb->pnn, hdr->reqid));
1325 return;
1328 if (hdr->reqid != state->reqid) {
1329 /* we found a record but it was the wrong one */
1330 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1331 return;
1334 talloc_steal(state, c);
1336 state->state = CTDB_CALL_ERROR;
1337 state->errmsg = (char *)c->msg;
1338 if (state->async.fn) {
1339 state->async.fn(state);
1345 destroy a ctdb_call
1347 static int ctdb_call_destructor(struct ctdb_call_state *state)
1349 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1350 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1351 return 0;
1356 called when a ctdb_call needs to be resent after a reconfigure event
1358 static void ctdb_call_resend(struct ctdb_call_state *state)
1360 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1362 state->generation = state->ctdb_db->generation;
1364 /* use a new reqid, in case the old reply does eventually come in */
1365 reqid_remove(ctdb->idr, state->reqid);
1366 state->reqid = reqid_new(ctdb->idr, state);
1367 state->c->hdr.reqid = state->reqid;
1369 /* update the generation count for this request, so its valid with the new vnn_map */
1370 state->c->hdr.generation = state->generation;
1372 /* send the packet to ourselves, it will be redirected appropriately */
1373 state->c->hdr.destnode = ctdb->pnn;
1375 ctdb_queue_packet(ctdb, &state->c->hdr);
1376 DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1377 state->ctdb_db->db_name, state->reqid, state->generation));
1381 resend all pending calls on recovery
1383 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1385 struct ctdb_call_state *state, *next;
1387 for (state = ctdb_db->pending_calls; state; state = next) {
1388 next = state->next;
1389 ctdb_call_resend(state);
1393 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1395 struct ctdb_db_context *ctdb_db;
1397 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1398 ctdb_call_resend_db(ctdb_db);
1403 this allows the caller to setup a async.fn
1405 static void call_local_trigger(struct tevent_context *ev,
1406 struct tevent_timer *te,
1407 struct timeval t, void *private_data)
1409 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1410 if (state->async.fn) {
1411 state->async.fn(state);
1417 construct an event driven local ctdb_call
1419 this is used so that locally processed ctdb_call requests are processed
1420 in an event driven manner
1422 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1423 struct ctdb_call *call,
1424 struct ctdb_ltdb_header *header,
1425 TDB_DATA *data)
1427 struct ctdb_call_state *state;
1428 struct ctdb_context *ctdb = ctdb_db->ctdb;
1429 int ret;
1431 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1432 CTDB_NO_MEMORY_NULL(ctdb, state);
1434 talloc_steal(state, data->dptr);
1436 state->state = CTDB_CALL_DONE;
1437 state->call = talloc(state, struct ctdb_call);
1438 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1439 *(state->call) = *call;
1440 state->ctdb_db = ctdb_db;
1442 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1443 if (ret != 0) {
1444 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1447 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1448 call_local_trigger, state);
1450 return state;
1455 make a remote ctdb call - async send. Called in daemon context.
1457 This constructs a ctdb_call request and queues it for processing.
1458 This call never blocks.
1460 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1461 struct ctdb_call *call,
1462 struct ctdb_ltdb_header *header)
1464 uint32_t len;
1465 struct ctdb_call_state *state;
1466 struct ctdb_context *ctdb = ctdb_db->ctdb;
1467 struct ctdb_req_call_old *c;
1469 if (ctdb->methods == NULL) {
1470 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1471 return NULL;
1474 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1475 CTDB_NO_MEMORY_NULL(ctdb, state);
1476 state->call = talloc(state, struct ctdb_call);
1477 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1479 state->reqid = reqid_new(ctdb->idr, state);
1480 state->ctdb_db = ctdb_db;
1481 state->state = CTDB_CALL_WAIT;
1482 state->generation = ctdb_db->generation;
1484 len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize +
1485 call->call_data.dsize;
1487 c = ctdb_transport_allocate(ctdb,
1488 state,
1489 CTDB_REQ_CALL,
1490 len,
1491 struct ctdb_req_call_old);
1493 CTDB_NO_MEMORY_NULL(ctdb, c);
1494 state->c = c;
1496 c->hdr.destnode = header->dmaster;
1497 c->hdr.reqid = state->reqid;
1498 c->hdr.generation = ctdb_db->generation;
1499 c->flags = call->flags;
1500 c->db_id = ctdb_db->db_id;
1501 c->callid = call->call_id;
1502 c->hopcount = 0;
1503 c->keylen = call->key.dsize;
1504 c->calldatalen = call->call_data.dsize;
1506 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
1507 memcpy(&c->data[call->key.dsize],
1508 call->call_data.dptr,
1509 call->call_data.dsize);
1511 *(state->call) = *call;
1512 state->call->call_data.dptr = &c->data[call->key.dsize];
1513 state->call->key.dptr = &c->data[0];
1515 DLIST_ADD(ctdb_db->pending_calls, state);
1517 talloc_set_destructor(state, ctdb_call_destructor);
1518 ctdb_queue_packet(ctdb, &state->c->hdr);
1520 return state;
1524 make a remote ctdb call - async recv - called in daemon context
1526 This is called when the program wants to wait for a ctdb_call to complete and get the
1527 results. This call will block unless the call has already completed.
1529 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1531 while (state->state < CTDB_CALL_DONE) {
1532 tevent_loop_once(state->ctdb_db->ctdb->ev);
1534 if (state->state != CTDB_CALL_DONE) {
1535 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1536 talloc_free(state);
1537 return -1;
1540 if (state->call->reply_data.dsize) {
1541 call->reply_data.dptr = talloc_memdup(call,
1542 state->call->reply_data.dptr,
1543 state->call->reply_data.dsize);
1544 call->reply_data.dsize = state->call->reply_data.dsize;
1545 } else {
1546 call->reply_data.dptr = NULL;
1547 call->reply_data.dsize = 0;
1549 call->status = state->call->status;
1550 talloc_free(state);
1551 return 0;
1555 struct revokechild_deferred_call {
1556 struct revokechild_deferred_call *prev, *next;
1557 struct ctdb_context *ctdb;
1558 struct ctdb_req_header *hdr;
1559 deferred_requeue_fn fn;
1560 void *ctx;
1561 struct revokechild_handle *rev_hdl;
1564 struct revokechild_handle {
1565 struct revokechild_handle *next, *prev;
1566 struct ctdb_context *ctdb;
1567 struct ctdb_db_context *ctdb_db;
1568 struct tevent_fd *fde;
1569 int status;
1570 int fd[2];
1571 pid_t child;
1572 TDB_DATA key;
1573 struct revokechild_deferred_call *deferred_call_list;
1576 static void deferred_call_requeue(struct tevent_context *ev,
1577 struct tevent_timer *te,
1578 struct timeval t, void *private_data)
1580 struct revokechild_deferred_call *dlist = talloc_get_type_abort(
1581 private_data, struct revokechild_deferred_call);
1583 while (dlist != NULL) {
1584 struct revokechild_deferred_call *dcall = dlist;
1586 talloc_set_destructor(dcall, NULL);
1587 DLIST_REMOVE(dlist, dcall);
1588 dcall->fn(dcall->ctx, dcall->hdr);
1589 talloc_free(dcall);
1593 static int deferred_call_destructor(struct revokechild_deferred_call *dcall)
1595 struct revokechild_handle *rev_hdl = dcall->rev_hdl;
1597 DLIST_REMOVE(rev_hdl->deferred_call_list, dcall);
1598 return 0;
1601 static int revokechild_destructor(struct revokechild_handle *rev_hdl)
1603 struct revokechild_deferred_call *now_list = NULL;
1604 struct revokechild_deferred_call *delay_list = NULL;
1606 if (rev_hdl->fde != NULL) {
1607 talloc_free(rev_hdl->fde);
1610 if (rev_hdl->fd[0] != -1) {
1611 close(rev_hdl->fd[0]);
1613 if (rev_hdl->fd[1] != -1) {
1614 close(rev_hdl->fd[1]);
1616 ctdb_kill(rev_hdl->ctdb, rev_hdl->child, SIGKILL);
1618 DLIST_REMOVE(rev_hdl->ctdb_db->revokechild_active, rev_hdl);
1620 while (rev_hdl->deferred_call_list != NULL) {
1621 struct revokechild_deferred_call *dcall;
1623 dcall = rev_hdl->deferred_call_list;
1624 DLIST_REMOVE(rev_hdl->deferred_call_list, dcall);
1626 /* If revoke is successful, then first process all the calls
1627 * that need write access, and delay readonly requests by 1
1628 * second grace.
1630 * If revoke is unsuccessful, most likely because of node
1631 * failure, delay all the pending requests, so database can
1632 * be recovered.
1635 if (rev_hdl->status == 0) {
1636 struct ctdb_req_call_old *c;
1638 c = (struct ctdb_req_call_old *)dcall->hdr;
1639 if (c->flags & CTDB_WANT_READONLY) {
1640 DLIST_ADD(delay_list, dcall);
1641 } else {
1642 DLIST_ADD(now_list, dcall);
1644 } else {
1645 DLIST_ADD(delay_list, dcall);
1649 if (now_list != NULL) {
1650 tevent_add_timer(rev_hdl->ctdb->ev,
1651 rev_hdl->ctdb_db,
1652 tevent_timeval_current_ofs(0, 0),
1653 deferred_call_requeue,
1654 now_list);
1657 if (delay_list != NULL) {
1658 tevent_add_timer(rev_hdl->ctdb->ev,
1659 rev_hdl->ctdb_db,
1660 tevent_timeval_current_ofs(1, 0),
1661 deferred_call_requeue,
1662 delay_list);
1665 return 0;
1668 static void revokechild_handler(struct tevent_context *ev,
1669 struct tevent_fd *fde,
1670 uint16_t flags, void *private_data)
1672 struct revokechild_handle *rev_hdl =
1673 talloc_get_type(private_data, struct revokechild_handle);
1674 int ret;
1675 char c;
1677 ret = sys_read(rev_hdl->fd[0], &c, 1);
1678 if (ret != 1) {
1679 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1680 rev_hdl->status = -1;
1681 talloc_free(rev_hdl);
1682 return;
1684 if (c != 0) {
1685 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1686 rev_hdl->status = -1;
1687 talloc_free(rev_hdl);
1688 return;
1691 talloc_free(rev_hdl);
1694 struct ctdb_revoke_state {
1695 struct ctdb_db_context *ctdb_db;
1696 TDB_DATA key;
1697 struct ctdb_ltdb_header *header;
1698 TDB_DATA data;
1699 int count;
1700 int status;
1701 int finished;
1704 static void update_record_cb(struct ctdb_client_control_state *state)
1706 struct ctdb_revoke_state *revoke_state;
1707 int ret;
1708 int32_t res;
1710 if (state == NULL) {
1711 return;
1713 revoke_state = state->async.private_data;
1715 state->async.fn = NULL;
1716 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1717 if ((ret != 0) || (res != 0)) {
1718 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1719 revoke_state->status = -1;
1722 revoke_state->count--;
1723 if (revoke_state->count <= 0) {
1724 revoke_state->finished = 1;
1728 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1730 struct ctdb_revoke_state *revoke_state = private_data;
1731 struct ctdb_client_control_state *state;
1733 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1734 if (state == NULL) {
1735 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1736 revoke_state->status = -1;
1737 return;
1739 state->async.fn = update_record_cb;
1740 state->async.private_data = revoke_state;
1742 revoke_state->count++;
1746 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1747 struct tevent_timer *te,
1748 struct timeval yt, void *private_data)
1750 struct ctdb_revoke_state *state = private_data;
1752 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1753 state->finished = 1;
1754 state->status = -1;
1757 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1759 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1760 struct ctdb_ltdb_header new_header;
1761 TDB_DATA new_data;
1763 state->ctdb_db = ctdb_db;
1764 state->key = key;
1765 state->header = header;
1766 state->data = data;
1768 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1770 tevent_add_timer(ctdb->ev, state,
1771 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1772 ctdb_revoke_timeout_handler, state);
1774 while (state->finished == 0) {
1775 tevent_loop_once(ctdb->ev);
1778 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1779 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1780 talloc_free(state);
1781 return -1;
1783 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1784 ctdb_ltdb_unlock(ctdb_db, key);
1785 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1786 talloc_free(state);
1787 return -1;
1789 header->rsn++;
1790 if (new_header.rsn > header->rsn) {
1791 ctdb_ltdb_unlock(ctdb_db, key);
1792 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1793 talloc_free(state);
1794 return -1;
1796 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1797 ctdb_ltdb_unlock(ctdb_db, key);
1798 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1799 talloc_free(state);
1800 return -1;
1804 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1805 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1807 if (state->status == 0) {
1808 new_header.rsn++;
1809 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1810 } else {
1811 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1812 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1814 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1815 ctdb_ltdb_unlock(ctdb_db, key);
1816 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1817 talloc_free(state);
1818 return -1;
1820 ctdb_ltdb_unlock(ctdb_db, key);
1822 talloc_free(state);
1823 return 0;
1827 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb,
1828 struct ctdb_db_context *ctdb_db,
1829 TDB_DATA key,
1830 struct ctdb_ltdb_header *header,
1831 TDB_DATA data)
1833 TDB_DATA tdata;
1834 struct revokechild_handle *rev_hdl;
1835 pid_t parent = getpid();
1836 int ret;
1838 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY |
1839 CTDB_REC_RO_HAVE_DELEGATIONS |
1840 CTDB_REC_RO_HAVE_READONLY);
1842 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1843 header->rsn -= 1;
1845 rev_hdl = talloc_zero(ctdb_db, struct revokechild_handle);
1846 if (rev_hdl == NULL) {
1847 D_ERR("Failed to allocate revokechild_handle\n");
1848 return -1;
1851 tdata = tdb_fetch(ctdb_db->rottdb, key);
1852 if (tdata.dsize > 0) {
1853 uint8_t *tmp;
1855 tmp = tdata.dptr;
1856 tdata.dptr = talloc_memdup(rev_hdl, tdata.dptr, tdata.dsize);
1857 free(tmp);
1860 rev_hdl->status = 0;
1861 rev_hdl->ctdb = ctdb;
1862 rev_hdl->ctdb_db = ctdb_db;
1863 rev_hdl->fd[0] = -1;
1864 rev_hdl->fd[1] = -1;
1866 rev_hdl->key.dsize = key.dsize;
1867 rev_hdl->key.dptr = talloc_memdup(rev_hdl, key.dptr, key.dsize);
1868 if (rev_hdl->key.dptr == NULL) {
1869 D_ERR("Failed to allocate key for revokechild_handle\n");
1870 goto err_out;
1873 ret = pipe(rev_hdl->fd);
1874 if (ret != 0) {
1875 D_ERR("Failed to allocate key for revokechild_handle\n");
1876 goto err_out;
1880 rev_hdl->child = ctdb_fork(ctdb);
1881 if (rev_hdl->child == (pid_t)-1) {
1882 D_ERR("Failed to fork child for revokechild\n");
1883 goto err_out;
1886 if (rev_hdl->child == 0) {
1887 char c = 0;
1888 close(rev_hdl->fd[0]);
1890 prctl_set_comment("ctdb_revokechild");
1891 if (switch_from_server_to_client(ctdb) != 0) {
1892 D_ERR("Failed to switch from server to client "
1893 "for revokechild process\n");
1894 c = 1;
1895 goto child_finished;
1898 c = ctdb_revoke_all_delegations(ctdb,
1899 ctdb_db,
1900 tdata,
1901 key,
1902 header,
1903 data);
1905 child_finished:
1906 sys_write(rev_hdl->fd[1], &c, 1);
1907 ctdb_wait_for_process_to_exit(parent);
1908 _exit(0);
1911 close(rev_hdl->fd[1]);
1912 rev_hdl->fd[1] = -1;
1913 set_close_on_exec(rev_hdl->fd[0]);
1915 rev_hdl->fde = tevent_add_fd(ctdb->ev,
1916 rev_hdl,
1917 rev_hdl->fd[0],
1918 TEVENT_FD_READ,
1919 revokechild_handler,
1920 (void *)rev_hdl);
1922 if (rev_hdl->fde == NULL) {
1923 D_ERR("Failed to set up fd event for revokechild process\n");
1924 talloc_free(rev_hdl);
1926 tevent_fd_set_auto_close(rev_hdl->fde);
1928 /* This is an active revokechild child process */
1929 DLIST_ADD_END(ctdb_db->revokechild_active, rev_hdl);
1930 talloc_set_destructor(rev_hdl, revokechild_destructor);
1932 return 0;
1933 err_out:
1934 talloc_free(rev_hdl);
1935 return -1;
1938 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1940 struct revokechild_handle *rev_hdl;
1941 struct revokechild_deferred_call *deferred_call;
1943 for (rev_hdl = ctdb_db->revokechild_active;
1944 rev_hdl;
1945 rev_hdl = rev_hdl->next) {
1946 if (rev_hdl->key.dsize == 0) {
1947 continue;
1949 if (rev_hdl->key.dsize != key.dsize) {
1950 continue;
1952 if (!memcmp(rev_hdl->key.dptr, key.dptr, key.dsize)) {
1953 break;
1957 if (rev_hdl == NULL) {
1958 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1959 return -1;
1962 deferred_call = talloc(call_context, struct revokechild_deferred_call);
1963 if (deferred_call == NULL) {
1964 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1965 return -1;
1968 deferred_call->ctdb = ctdb;
1969 deferred_call->hdr = talloc_steal(deferred_call, hdr);
1970 deferred_call->fn = fn;
1971 deferred_call->ctx = call_context;
1972 deferred_call->rev_hdl = rev_hdl;
1974 talloc_set_destructor(deferred_call, deferred_call_destructor);
1976 DLIST_ADD(rev_hdl->deferred_call_list, deferred_call);
1978 return 0;
1981 static void ctdb_migration_count_handler(TDB_DATA key, uint64_t counter,
1982 void *private_data)
1984 struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1985 private_data, struct ctdb_db_context);
1986 int value;
1988 value = (counter < INT_MAX ? counter : INT_MAX);
1989 ctdb_update_db_stat_hot_keys(ctdb_db, key, value);
1992 static void ctdb_migration_cleandb_event(struct tevent_context *ev,
1993 struct tevent_timer *te,
1994 struct timeval current_time,
1995 void *private_data)
1997 struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1998 private_data, struct ctdb_db_context);
2000 if (ctdb_db->migratedb == NULL) {
2001 return;
2004 hash_count_expire(ctdb_db->migratedb, NULL);
2006 te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
2007 tevent_timeval_current_ofs(10, 0),
2008 ctdb_migration_cleandb_event, ctdb_db);
2009 if (te == NULL) {
2010 DEBUG(DEBUG_ERR,
2011 ("Memory error in migration cleandb event for %s\n",
2012 ctdb_db->db_name));
2013 TALLOC_FREE(ctdb_db->migratedb);
2017 int ctdb_migration_init(struct ctdb_db_context *ctdb_db)
2019 struct timeval one_second = { 1, 0 };
2020 struct tevent_timer *te;
2021 int ret;
2023 if (! ctdb_db_volatile(ctdb_db)) {
2024 return 0;
2027 ret = hash_count_init(ctdb_db, one_second,
2028 ctdb_migration_count_handler, ctdb_db,
2029 &ctdb_db->migratedb);
2030 if (ret != 0) {
2031 DEBUG(DEBUG_ERR,
2032 ("Memory error in migration init for %s\n",
2033 ctdb_db->db_name));
2034 return -1;
2037 te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
2038 tevent_timeval_current_ofs(10, 0),
2039 ctdb_migration_cleandb_event, ctdb_db);
2040 if (te == NULL) {
2041 DEBUG(DEBUG_ERR,
2042 ("Memory error in migration init for %s\n",
2043 ctdb_db->db_name));
2044 TALLOC_FREE(ctdb_db->migratedb);
2045 return -1;
2048 return 0;