s4:torture:raw:notify: treat torture_open_connection calls with torture_assert
[Samba.git] / ctdb / server / ctdb_call.c
blob391dfb1ab5a6f0e6474edcf82b8cde59b8132b3d
1 /*
2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record {
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
34 TDB_CONTEXT *pindown;
38 find the ctdb_db from a db index
40 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 struct ctdb_db_context *ctdb_db;
44 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45 if (ctdb_db->db_id == id) {
46 break;
49 return ctdb_db;
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58 ctdb_input_pkt(ctdb, hdr);
63 send an error reply
65 static void ctdb_send_error(struct ctdb_context *ctdb,
66 struct ctdb_req_header *hdr, uint32_t status,
67 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb,
69 struct ctdb_req_header *hdr, uint32_t status,
70 const char *fmt, ...)
72 va_list ap;
73 struct ctdb_reply_error *r;
74 char *msg;
75 int msglen, len;
77 if (ctdb->methods == NULL) {
78 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
79 return;
82 va_start(ap, fmt);
83 msg = talloc_vasprintf(ctdb, fmt, ap);
84 if (msg == NULL) {
85 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
87 va_end(ap);
89 msglen = strlen(msg)+1;
90 len = offsetof(struct ctdb_reply_error, msg);
91 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
92 struct ctdb_reply_error);
93 CTDB_NO_MEMORY_FATAL(ctdb, r);
95 r->hdr.destnode = hdr->srcnode;
96 r->hdr.reqid = hdr->reqid;
97 r->status = status;
98 r->msglen = msglen;
99 memcpy(&r->msg[0], msg, msglen);
101 ctdb_queue_packet(ctdb, &r->hdr);
103 talloc_free(msg);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
121 struct ctdb_db_context *ctdb_db,
122 TDB_DATA key,
123 struct ctdb_req_call *c,
124 struct ctdb_ltdb_header *header)
126 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
128 c->hdr.destnode = lmaster;
129 if (ctdb->pnn == lmaster) {
130 c->hdr.destnode = header->dmaster;
132 c->hopcount++;
134 if (c->hopcount%100 > 95) {
135 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
136 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
139 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
140 header->dmaster, c->hdr.destnode));
143 ctdb_queue_packet(ctdb, &c->hdr);
148 send a dmaster reply
150 caller must have the chainlock before calling this routine. Caller must be
151 the lmaster
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
154 struct ctdb_ltdb_header *header,
155 TDB_DATA key, TDB_DATA data,
156 uint32_t new_dmaster,
157 uint32_t reqid)
159 struct ctdb_context *ctdb = ctdb_db->ctdb;
160 struct ctdb_reply_dmaster *r;
161 int ret, len;
162 TALLOC_CTX *tmp_ctx;
164 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
165 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
166 return;
169 header->dmaster = new_dmaster;
170 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
171 if (ret != 0) {
172 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
173 return;
176 if (ctdb->methods == NULL) {
177 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
178 return;
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx = talloc_new(ctdb);
185 /* send the CTDB_REPLY_DMASTER */
186 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
187 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
188 struct ctdb_reply_dmaster);
189 CTDB_NO_MEMORY_FATAL(ctdb, r);
191 r->hdr.destnode = new_dmaster;
192 r->hdr.reqid = reqid;
193 r->rsn = header->rsn;
194 r->keylen = key.dsize;
195 r->datalen = data.dsize;
196 r->db_id = ctdb_db->db_id;
197 memcpy(&r->data[0], key.dptr, key.dsize);
198 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
199 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
201 ctdb_queue_packet(ctdb, &r->hdr);
203 talloc_free(tmp_ctx);
207 send a dmaster request (give another node the dmaster for a record)
209 This is always sent to the lmaster, which ensures that the lmaster
210 always knows who the dmaster is. The lmaster will then send a
211 CTDB_REPLY_DMASTER to the new dmaster
213 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
214 struct ctdb_req_call *c,
215 struct ctdb_ltdb_header *header,
216 TDB_DATA *key, TDB_DATA *data)
218 struct ctdb_req_dmaster *r;
219 struct ctdb_context *ctdb = ctdb_db->ctdb;
220 int len;
221 uint32_t lmaster = ctdb_lmaster(ctdb, key);
223 if (ctdb->methods == NULL) {
224 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
225 return;
228 if (data->dsize != 0) {
229 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
232 if (lmaster == ctdb->pnn) {
233 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
234 c->hdr.srcnode, c->hdr.reqid);
235 return;
238 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
239 + sizeof(uint32_t);
240 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
241 struct ctdb_req_dmaster);
242 CTDB_NO_MEMORY_FATAL(ctdb, r);
243 r->hdr.destnode = lmaster;
244 r->hdr.reqid = c->hdr.reqid;
245 r->db_id = c->db_id;
246 r->rsn = header->rsn;
247 r->dmaster = c->hdr.srcnode;
248 r->keylen = key->dsize;
249 r->datalen = data->dsize;
250 memcpy(&r->data[0], key->dptr, key->dsize);
251 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
252 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
254 header->dmaster = c->hdr.srcnode;
255 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
256 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
259 ctdb_queue_packet(ctdb, &r->hdr);
261 talloc_free(r);
264 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
265 struct timeval t, void *private_data)
267 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
268 struct ctdb_sticky_record);
270 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
271 if (sr->pindown != NULL) {
272 talloc_free(sr->pindown);
273 sr->pindown = NULL;
277 static int
278 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
280 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
281 uint32_t *k;
282 struct ctdb_sticky_record *sr;
284 k = ctdb_key_to_idkey(tmp_ctx, key);
285 if (k == NULL) {
286 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
287 talloc_free(tmp_ctx);
288 return -1;
291 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
292 if (sr == NULL) {
293 talloc_free(tmp_ctx);
294 return 0;
297 talloc_free(tmp_ctx);
299 if (sr->pindown == NULL) {
300 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
301 sr->pindown = talloc_new(sr);
302 if (sr->pindown == NULL) {
303 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
304 return -1;
306 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
309 return 0;
313 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
314 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
316 must be called with the chainlock held. This function releases the chainlock
318 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
319 struct ctdb_req_header *hdr,
320 TDB_DATA key, TDB_DATA data,
321 uint64_t rsn, uint32_t record_flags)
323 struct ctdb_call_state *state;
324 struct ctdb_context *ctdb = ctdb_db->ctdb;
325 struct ctdb_ltdb_header header;
326 int ret;
328 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
330 ZERO_STRUCT(header);
331 header.rsn = rsn;
332 header.dmaster = ctdb->pnn;
333 header.flags = record_flags;
335 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
337 if (state) {
338 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
340 * We temporarily add the VACUUM_MIGRATED flag to
341 * the record flags, so that ctdb_ltdb_store can
342 * decide whether the record should be stored or
343 * deleted.
345 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
349 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
350 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
352 ret = ctdb_ltdb_unlock(ctdb_db, key);
353 if (ret != 0) {
354 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
356 return;
359 /* we just became DMASTER and this database is "sticky",
360 see if the record is flagged as "hot" and set up a pin-down
361 context to stop migrations for a little while if so
363 if (ctdb_db->sticky) {
364 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
367 if (state == NULL) {
368 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
369 ctdb->pnn, hdr->reqid, hdr->srcnode));
371 ret = ctdb_ltdb_unlock(ctdb_db, key);
372 if (ret != 0) {
373 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
375 return;
378 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
379 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
381 ret = ctdb_ltdb_unlock(ctdb_db, key);
382 if (ret != 0) {
383 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
385 return;
388 if (hdr->reqid != state->reqid) {
389 /* we found a record but it was the wrong one */
390 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
392 ret = ctdb_ltdb_unlock(ctdb_db, key);
393 if (ret != 0) {
394 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
396 return;
399 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
401 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
402 if (ret != 0) {
403 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
406 state->state = CTDB_CALL_DONE;
407 if (state->async.fn) {
408 state->async.fn(state);
412 struct dmaster_defer_call {
413 struct dmaster_defer_call *next, *prev;
414 struct ctdb_context *ctdb;
415 struct ctdb_req_header *hdr;
418 struct dmaster_defer_queue {
419 struct ctdb_context *ctdb;
420 uint32_t generation;
421 struct dmaster_defer_call *deferred_calls;
424 static void dmaster_defer_reprocess(struct tevent_context *ev,
425 struct tevent_timer *te,
426 struct timeval t,
427 void *private_data)
429 struct dmaster_defer_call *call = talloc_get_type(
430 private_data, struct dmaster_defer_call);
432 ctdb_input_pkt(call->ctdb, call->hdr);
433 talloc_free(call);
436 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
438 /* Ignore requests, if database recovery happens in-between. */
439 if (ddq->generation != ddq->ctdb->vnn_map->generation) {
440 return 0;
443 while (ddq->deferred_calls != NULL) {
444 struct dmaster_defer_call *call = ddq->deferred_calls;
446 DLIST_REMOVE(ddq->deferred_calls, call);
448 talloc_steal(call->ctdb, call);
449 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
450 dmaster_defer_reprocess, call);
452 return 0;
455 static void *insert_ddq_callback(void *parm, void *data)
457 if (data) {
458 talloc_free(data);
460 return parm;
464 * This function is used to reigster a key in database that needs to be updated.
465 * Any requests for that key should get deferred till this is completed.
467 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
468 struct ctdb_req_header *hdr,
469 TDB_DATA key)
471 uint32_t *k;
472 struct dmaster_defer_queue *ddq;
474 k = ctdb_key_to_idkey(hdr, key);
475 if (k == NULL) {
476 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
477 return -1;
480 /* Already exists */
481 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
482 if (ddq != NULL) {
483 talloc_free(k);
484 return 0;
487 ddq = talloc(hdr, struct dmaster_defer_queue);
488 if (ddq == NULL) {
489 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
490 talloc_free(k);
491 return -1;
493 ddq->ctdb = ctdb_db->ctdb;
494 ddq->generation = hdr->generation;
495 ddq->deferred_calls = NULL;
497 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
498 insert_ddq_callback, ddq);
499 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
501 talloc_free(k);
502 return 0;
505 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
506 struct ctdb_req_header *hdr,
507 TDB_DATA key)
509 struct dmaster_defer_queue *ddq;
510 struct dmaster_defer_call *call;
511 uint32_t *k;
513 k = ctdb_key_to_idkey(hdr, key);
514 if (k == NULL) {
515 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
516 return -1;
519 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
520 if (ddq == NULL) {
521 talloc_free(k);
522 return -1;
525 talloc_free(k);
527 if (ddq->generation != hdr->generation) {
528 talloc_set_destructor(ddq, NULL);
529 talloc_free(ddq);
530 return -1;
533 call = talloc(ddq, struct dmaster_defer_call);
534 if (call == NULL) {
535 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
536 return -1;
539 call->ctdb = ctdb_db->ctdb;
540 call->hdr = talloc_steal(call, hdr);
542 DLIST_ADD_END(ddq->deferred_calls, call, NULL);
544 return 0;
548 called when a CTDB_REQ_DMASTER packet comes in
550 this comes into the lmaster for a record when the current dmaster
551 wants to give up the dmaster role and give it to someone else
553 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
555 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
556 TDB_DATA key, data, data2;
557 struct ctdb_ltdb_header header;
558 struct ctdb_db_context *ctdb_db;
559 uint32_t record_flags = 0;
560 size_t len;
561 int ret;
563 key.dptr = c->data;
564 key.dsize = c->keylen;
565 data.dptr = c->data + c->keylen;
566 data.dsize = c->datalen;
567 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
568 + sizeof(uint32_t);
569 if (len <= c->hdr.length) {
570 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
571 sizeof(record_flags));
574 ctdb_db = find_ctdb_db(ctdb, c->db_id);
575 if (!ctdb_db) {
576 ctdb_send_error(ctdb, hdr, -1,
577 "Unknown database in request. db_id==0x%08x",
578 c->db_id);
579 return;
582 dmaster_defer_setup(ctdb_db, hdr, key);
584 /* fetch the current record */
585 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
586 ctdb_call_input_pkt, ctdb, false);
587 if (ret == -1) {
588 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
589 return;
591 if (ret == -2) {
592 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
593 return;
596 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
597 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
598 ctdb->pnn, ctdb_lmaster(ctdb, &key),
599 hdr->generation, ctdb->vnn_map->generation));
600 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
603 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
604 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
606 /* its a protocol error if the sending node is not the current dmaster */
607 if (header.dmaster != hdr->srcnode) {
608 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
609 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
610 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
611 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
612 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
613 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
614 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
616 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
617 ctdb_ltdb_unlock(ctdb_db, key);
618 return;
622 if (header.rsn > c->rsn) {
623 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
624 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
625 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
626 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
629 /* use the rsn from the sending node */
630 header.rsn = c->rsn;
632 /* store the record flags from the sending node */
633 header.flags = record_flags;
635 /* check if the new dmaster is the lmaster, in which case we
636 skip the dmaster reply */
637 if (c->dmaster == ctdb->pnn) {
638 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
639 } else {
640 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
642 ret = ctdb_ltdb_unlock(ctdb_db, key);
643 if (ret != 0) {
644 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
649 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
650 struct timeval t, void *private_data)
652 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
653 struct ctdb_sticky_record);
654 talloc_free(sr);
657 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
659 if (data) {
660 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
661 talloc_free(data);
663 return parm;
666 static int
667 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
669 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
670 uint32_t *k;
671 struct ctdb_sticky_record *sr;
673 k = ctdb_key_to_idkey(tmp_ctx, key);
674 if (k == NULL) {
675 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
676 talloc_free(tmp_ctx);
677 return -1;
680 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
681 if (sr != NULL) {
682 talloc_free(tmp_ctx);
683 return 0;
686 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
687 if (sr == NULL) {
688 talloc_free(tmp_ctx);
689 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
690 return -1;
693 sr->ctdb = ctdb;
694 sr->ctdb_db = ctdb_db;
695 sr->pindown = NULL;
697 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
698 ctdb->tunable.sticky_duration,
699 ctdb_db->db_name, ctdb_hash(&key)));
701 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
703 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
705 talloc_free(tmp_ctx);
706 return 0;
709 struct pinned_down_requeue_handle {
710 struct ctdb_context *ctdb;
711 struct ctdb_req_header *hdr;
714 struct pinned_down_deferred_call {
715 struct ctdb_context *ctdb;
716 struct ctdb_req_header *hdr;
719 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
720 struct timeval t, void *private_data)
722 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
723 struct ctdb_context *ctdb = handle->ctdb;
725 talloc_steal(ctdb, handle->hdr);
726 ctdb_call_input_pkt(ctdb, handle->hdr);
728 talloc_free(handle);
731 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
733 struct ctdb_context *ctdb = pinned_down->ctdb;
734 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
736 handle->ctdb = pinned_down->ctdb;
737 handle->hdr = pinned_down->hdr;
738 talloc_steal(handle, handle->hdr);
740 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
742 return 0;
745 static int
746 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
748 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
749 uint32_t *k;
750 struct ctdb_sticky_record *sr;
751 struct pinned_down_deferred_call *pinned_down;
753 k = ctdb_key_to_idkey(tmp_ctx, key);
754 if (k == NULL) {
755 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
756 talloc_free(tmp_ctx);
757 return -1;
760 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
761 if (sr == NULL) {
762 talloc_free(tmp_ctx);
763 return -1;
766 talloc_free(tmp_ctx);
768 if (sr->pindown == NULL) {
769 return -1;
772 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
773 if (pinned_down == NULL) {
774 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
775 return -1;
778 pinned_down->ctdb = ctdb;
779 pinned_down->hdr = hdr;
781 talloc_set_destructor(pinned_down, pinned_down_destructor);
782 talloc_steal(pinned_down, hdr);
784 return 0;
787 static void
788 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
790 int i, id;
792 /* smallest value is always at index 0 */
793 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
794 return;
797 /* see if we already know this key */
798 for (i = 0; i < MAX_HOT_KEYS; i++) {
799 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
800 continue;
802 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
803 continue;
805 /* found an entry for this key */
806 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
807 return;
809 ctdb_db->statistics.hot_keys[i].count = hopcount;
810 goto sort_keys;
813 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
814 id = ctdb_db->statistics.num_hot_keys;
815 ctdb_db->statistics.num_hot_keys++;
816 } else {
817 id = 0;
820 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
821 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
823 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
824 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
825 ctdb_db->statistics.hot_keys[id].count = hopcount;
826 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
827 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
829 sort_keys:
830 for (i = 1; i < MAX_HOT_KEYS; i++) {
831 if (ctdb_db->statistics.hot_keys[i].count == 0) {
832 continue;
834 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
835 hopcount = ctdb_db->statistics.hot_keys[i].count;
836 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
837 ctdb_db->statistics.hot_keys[0].count = hopcount;
839 key = ctdb_db->statistics.hot_keys[i].key;
840 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
841 ctdb_db->statistics.hot_keys[0].key = key;
847 called when a CTDB_REQ_CALL packet comes in
849 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
851 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
852 TDB_DATA data;
853 struct ctdb_reply_call *r;
854 int ret, len;
855 struct ctdb_ltdb_header header;
856 struct ctdb_call *call;
857 struct ctdb_db_context *ctdb_db;
858 int tmp_count, bucket;
860 if (ctdb->methods == NULL) {
861 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
862 return;
866 ctdb_db = find_ctdb_db(ctdb, c->db_id);
867 if (!ctdb_db) {
868 ctdb_send_error(ctdb, hdr, -1,
869 "Unknown database in request. db_id==0x%08x",
870 c->db_id);
871 return;
874 call = talloc(hdr, struct ctdb_call);
875 CTDB_NO_MEMORY_FATAL(ctdb, call);
877 call->call_id = c->callid;
878 call->key.dptr = c->data;
879 call->key.dsize = c->keylen;
880 call->call_data.dptr = c->data + c->keylen;
881 call->call_data.dsize = c->calldatalen;
882 call->reply_data.dptr = NULL;
883 call->reply_data.dsize = 0;
886 /* If this record is pinned down we should defer the
887 request until the pindown times out
889 if (ctdb_db->sticky) {
890 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
891 DEBUG(DEBUG_WARNING,
892 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
893 talloc_free(call);
894 return;
898 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
899 talloc_free(call);
900 return;
903 /* determine if we are the dmaster for this key. This also
904 fetches the record data (if any), thus avoiding a 2nd fetch of the data
905 if the call will be answered locally */
907 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
908 ctdb_call_input_pkt, ctdb, false);
909 if (ret == -1) {
910 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
911 talloc_free(call);
912 return;
914 if (ret == -2) {
915 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
916 talloc_free(call);
917 return;
920 /* Dont do READONLY if we dont have a tracking database */
921 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
922 c->flags &= ~CTDB_WANT_READONLY;
925 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
926 header.flags &= ~CTDB_REC_RO_FLAGS;
927 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
928 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
929 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
930 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
932 /* and clear out the tracking data */
933 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
934 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
938 /* if we are revoking, we must defer all other calls until the revoke
939 * had completed.
941 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
942 talloc_free(data.dptr);
943 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
945 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
946 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
948 talloc_free(call);
949 return;
953 * If we are not the dmaster and are not hosting any delegations,
954 * then we redirect the request to the node than can answer it
955 * (the lmaster or the dmaster).
957 if ((header.dmaster != ctdb->pnn)
958 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
959 talloc_free(data.dptr);
960 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
962 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
963 if (ret != 0) {
964 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
966 talloc_free(call);
967 return;
970 if ( (!(c->flags & CTDB_WANT_READONLY))
971 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
972 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
973 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
974 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
976 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
978 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
979 ctdb_fatal(ctdb, "Failed to start record revoke");
981 talloc_free(data.dptr);
983 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
984 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
986 talloc_free(call);
988 return;
991 /* If this is the first request for delegation. bump rsn and set
992 * the delegations flag
994 if ((c->flags & CTDB_WANT_READONLY)
995 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
996 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
997 header.rsn += 3;
998 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
999 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1000 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1003 if ((c->flags & CTDB_WANT_READONLY)
1004 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1005 TDB_DATA tdata;
1007 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1008 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1009 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1011 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1012 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1014 free(tdata.dptr);
1016 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1017 if (ret != 0) {
1018 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1021 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1022 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1023 struct ctdb_reply_call);
1024 CTDB_NO_MEMORY_FATAL(ctdb, r);
1025 r->hdr.destnode = c->hdr.srcnode;
1026 r->hdr.reqid = c->hdr.reqid;
1027 r->status = 0;
1028 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1029 header.rsn -= 2;
1030 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1031 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1032 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1034 if (data.dsize) {
1035 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1038 ctdb_queue_packet(ctdb, &r->hdr);
1039 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1040 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1042 talloc_free(r);
1043 talloc_free(call);
1044 return;
1047 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1048 tmp_count = c->hopcount;
1049 bucket = 0;
1050 while (tmp_count) {
1051 tmp_count >>= 2;
1052 bucket++;
1054 if (bucket >= MAX_COUNT_BUCKETS) {
1055 bucket = MAX_COUNT_BUCKETS - 1;
1057 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1058 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1059 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
1061 /* If this database supports sticky records, then check if the
1062 hopcount is big. If it is it means the record is hot and we
1063 should make it sticky.
1065 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1066 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1070 /* Try if possible to migrate the record off to the caller node.
1071 * From the clients perspective a fetch of the data is just as
1072 * expensive as a migration.
1074 if (c->hdr.srcnode != ctdb->pnn) {
1075 if (ctdb_db->persistent_state) {
1076 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1077 " of key %s while transaction is active\n",
1078 (char *)call->key.dptr));
1079 } else {
1080 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1081 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1082 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1083 talloc_free(data.dptr);
1085 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1086 if (ret != 0) {
1087 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1090 talloc_free(call);
1091 return;
1094 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1095 if (ret != 0) {
1096 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1097 call->status = -1;
1100 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1101 if (ret != 0) {
1102 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1105 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
1106 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1107 struct ctdb_reply_call);
1108 CTDB_NO_MEMORY_FATAL(ctdb, r);
1109 r->hdr.destnode = hdr->srcnode;
1110 r->hdr.reqid = hdr->reqid;
1111 r->status = call->status;
1112 r->datalen = call->reply_data.dsize;
1113 if (call->reply_data.dsize) {
1114 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1117 ctdb_queue_packet(ctdb, &r->hdr);
1119 talloc_free(r);
1120 talloc_free(call);
1124 * called when a CTDB_REPLY_CALL packet comes in
1126 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1127 * contains any reply data from the call
1129 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1131 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1132 struct ctdb_call_state *state;
1134 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1135 if (state == NULL) {
1136 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1137 return;
1140 if (hdr->reqid != state->reqid) {
1141 /* we found a record but it was the wrong one */
1142 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1143 return;
1147 /* read only delegation processing */
1148 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1149 * delegation since we may need to update the record header
1151 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1152 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1153 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1154 struct ctdb_ltdb_header oldheader;
1155 TDB_DATA key, data, olddata;
1156 int ret;
1158 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1159 goto finished_ro;
1160 return;
1163 key.dsize = state->c->keylen;
1164 key.dptr = state->c->data;
1165 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1166 ctdb_call_input_pkt, ctdb, false);
1167 if (ret == -2) {
1168 return;
1170 if (ret != 0) {
1171 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1172 return;
1175 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1176 if (ret != 0) {
1177 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1178 ctdb_ltdb_unlock(ctdb_db, key);
1179 goto finished_ro;
1182 if (header->rsn <= oldheader.rsn) {
1183 ctdb_ltdb_unlock(ctdb_db, key);
1184 goto finished_ro;
1187 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1188 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1189 ctdb_ltdb_unlock(ctdb_db, key);
1190 goto finished_ro;
1193 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1194 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1195 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1196 if (ret != 0) {
1197 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1198 ctdb_ltdb_unlock(ctdb_db, key);
1199 goto finished_ro;
1202 ctdb_ltdb_unlock(ctdb_db, key);
1204 finished_ro:
1206 state->call->reply_data.dptr = c->data;
1207 state->call->reply_data.dsize = c->datalen;
1208 state->call->status = c->status;
1210 talloc_steal(state, c);
1212 state->state = CTDB_CALL_DONE;
1213 if (state->async.fn) {
1214 state->async.fn(state);
1220 * called when a CTDB_REPLY_DMASTER packet comes in
1222 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1223 * request packet. It means that the current dmaster wants to give us
1224 * the dmaster role.
1226 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1228 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1229 struct ctdb_db_context *ctdb_db;
1230 TDB_DATA key, data;
1231 uint32_t record_flags = 0;
1232 size_t len;
1233 int ret;
1235 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1236 if (ctdb_db == NULL) {
1237 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1238 return;
1241 key.dptr = c->data;
1242 key.dsize = c->keylen;
1243 data.dptr = &c->data[key.dsize];
1244 data.dsize = c->datalen;
1245 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1246 + sizeof(uint32_t);
1247 if (len <= c->hdr.length) {
1248 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1249 sizeof(record_flags));
1252 dmaster_defer_setup(ctdb_db, hdr, key);
1254 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1255 ctdb_call_input_pkt, ctdb, false);
1256 if (ret == -2) {
1257 return;
1259 if (ret != 0) {
1260 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1261 return;
1264 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1269 called when a CTDB_REPLY_ERROR packet comes in
1271 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1273 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1274 struct ctdb_call_state *state;
1276 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1277 if (state == NULL) {
1278 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1279 ctdb->pnn, hdr->reqid));
1280 return;
1283 if (hdr->reqid != state->reqid) {
1284 /* we found a record but it was the wrong one */
1285 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1286 return;
1289 talloc_steal(state, c);
1291 state->state = CTDB_CALL_ERROR;
1292 state->errmsg = (char *)c->msg;
1293 if (state->async.fn) {
1294 state->async.fn(state);
1300 destroy a ctdb_call
1302 static int ctdb_call_destructor(struct ctdb_call_state *state)
1304 DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1305 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1306 return 0;
1311 called when a ctdb_call needs to be resent after a reconfigure event
1313 static void ctdb_call_resend(struct ctdb_call_state *state)
1315 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1317 state->generation = ctdb->vnn_map->generation;
1319 /* use a new reqid, in case the old reply does eventually come in */
1320 ctdb_reqid_remove(ctdb, state->reqid);
1321 state->reqid = ctdb_reqid_new(ctdb, state);
1322 state->c->hdr.reqid = state->reqid;
1324 /* update the generation count for this request, so its valid with the new vnn_map */
1325 state->c->hdr.generation = state->generation;
1327 /* send the packet to ourselves, it will be redirected appropriately */
1328 state->c->hdr.destnode = ctdb->pnn;
1330 ctdb_queue_packet(ctdb, &state->c->hdr);
1331 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1335 resend all pending calls on recovery
1337 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1339 struct ctdb_call_state *state, *next;
1340 for (state=ctdb->pending_calls;state;state=next) {
1341 next = state->next;
1342 ctdb_call_resend(state);
1347 this allows the caller to setup a async.fn
1349 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1350 struct timeval t, void *private_data)
1352 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1353 if (state->async.fn) {
1354 state->async.fn(state);
1360 construct an event driven local ctdb_call
1362 this is used so that locally processed ctdb_call requests are processed
1363 in an event driven manner
1365 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1366 struct ctdb_call *call,
1367 struct ctdb_ltdb_header *header,
1368 TDB_DATA *data)
1370 struct ctdb_call_state *state;
1371 struct ctdb_context *ctdb = ctdb_db->ctdb;
1372 int ret;
1374 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1375 CTDB_NO_MEMORY_NULL(ctdb, state);
1377 talloc_steal(state, data->dptr);
1379 state->state = CTDB_CALL_DONE;
1380 state->call = talloc(state, struct ctdb_call);
1381 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1382 *(state->call) = *call;
1383 state->ctdb_db = ctdb_db;
1385 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1386 if (ret != 0) {
1387 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1390 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1392 return state;
1397 make a remote ctdb call - async send. Called in daemon context.
1399 This constructs a ctdb_call request and queues it for processing.
1400 This call never blocks.
1402 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1403 struct ctdb_call *call,
1404 struct ctdb_ltdb_header *header)
1406 uint32_t len;
1407 struct ctdb_call_state *state;
1408 struct ctdb_context *ctdb = ctdb_db->ctdb;
1410 if (ctdb->methods == NULL) {
1411 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1412 return NULL;
1415 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1416 CTDB_NO_MEMORY_NULL(ctdb, state);
1417 state->call = talloc(state, struct ctdb_call);
1418 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1420 state->reqid = ctdb_reqid_new(ctdb, state);
1421 state->ctdb_db = ctdb_db;
1422 talloc_set_destructor(state, ctdb_call_destructor);
1424 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1425 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1426 struct ctdb_req_call);
1427 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1428 state->c->hdr.destnode = header->dmaster;
1430 /* this limits us to 16k outstanding messages - not unreasonable */
1431 state->c->hdr.reqid = state->reqid;
1432 state->c->flags = call->flags;
1433 state->c->db_id = ctdb_db->db_id;
1434 state->c->callid = call->call_id;
1435 state->c->hopcount = 0;
1436 state->c->keylen = call->key.dsize;
1437 state->c->calldatalen = call->call_data.dsize;
1438 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1439 memcpy(&state->c->data[call->key.dsize],
1440 call->call_data.dptr, call->call_data.dsize);
1441 *(state->call) = *call;
1442 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1443 state->call->key.dptr = &state->c->data[0];
1445 state->state = CTDB_CALL_WAIT;
1446 state->generation = ctdb->vnn_map->generation;
1448 DLIST_ADD(ctdb->pending_calls, state);
1450 ctdb_queue_packet(ctdb, &state->c->hdr);
1452 return state;
1456 make a remote ctdb call - async recv - called in daemon context
1458 This is called when the program wants to wait for a ctdb_call to complete and get the
1459 results. This call will block unless the call has already completed.
1461 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1463 while (state->state < CTDB_CALL_DONE) {
1464 event_loop_once(state->ctdb_db->ctdb->ev);
1466 if (state->state != CTDB_CALL_DONE) {
1467 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1468 talloc_free(state);
1469 return -1;
1472 if (state->call->reply_data.dsize) {
1473 call->reply_data.dptr = talloc_memdup(call,
1474 state->call->reply_data.dptr,
1475 state->call->reply_data.dsize);
1476 call->reply_data.dsize = state->call->reply_data.dsize;
1477 } else {
1478 call->reply_data.dptr = NULL;
1479 call->reply_data.dsize = 0;
1481 call->status = state->call->status;
1482 talloc_free(state);
1483 return 0;
1488 send a keepalive packet to the other node
1490 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1492 struct ctdb_req_keepalive *r;
1494 if (ctdb->methods == NULL) {
1495 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1496 return;
1499 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1500 sizeof(struct ctdb_req_keepalive),
1501 struct ctdb_req_keepalive);
1502 CTDB_NO_MEMORY_FATAL(ctdb, r);
1503 r->hdr.destnode = destnode;
1504 r->hdr.reqid = 0;
1506 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1508 ctdb_queue_packet(ctdb, &r->hdr);
1510 talloc_free(r);
1515 struct revokechild_deferred_call {
1516 struct ctdb_context *ctdb;
1517 struct ctdb_req_header *hdr;
1518 deferred_requeue_fn fn;
1519 void *ctx;
1522 struct revokechild_handle {
1523 struct revokechild_handle *next, *prev;
1524 struct ctdb_context *ctdb;
1525 struct ctdb_db_context *ctdb_db;
1526 struct fd_event *fde;
1527 int status;
1528 int fd[2];
1529 pid_t child;
1530 TDB_DATA key;
1533 struct revokechild_requeue_handle {
1534 struct ctdb_context *ctdb;
1535 struct ctdb_req_header *hdr;
1536 deferred_requeue_fn fn;
1537 void *ctx;
1540 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1541 struct timeval t, void *private_data)
1543 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1545 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1546 talloc_free(requeue_handle);
1549 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1551 struct ctdb_context *ctdb = deferred_call->ctdb;
1552 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1553 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1555 requeue_handle->ctdb = ctdb;
1556 requeue_handle->hdr = deferred_call->hdr;
1557 requeue_handle->fn = deferred_call->fn;
1558 requeue_handle->ctx = deferred_call->ctx;
1559 talloc_steal(requeue_handle, requeue_handle->hdr);
1561 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1562 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1564 return 0;
1568 static int revokechild_destructor(struct revokechild_handle *rc)
1570 if (rc->fde != NULL) {
1571 talloc_free(rc->fde);
1574 if (rc->fd[0] != -1) {
1575 close(rc->fd[0]);
1577 if (rc->fd[1] != -1) {
1578 close(rc->fd[1]);
1580 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1582 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1583 return 0;
1586 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1587 uint16_t flags, void *private_data)
1589 struct revokechild_handle *rc = talloc_get_type(private_data,
1590 struct revokechild_handle);
1591 int ret;
1592 char c;
1594 ret = sys_read(rc->fd[0], &c, 1);
1595 if (ret != 1) {
1596 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1597 rc->status = -1;
1598 talloc_free(rc);
1599 return;
1601 if (c != 0) {
1602 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1603 rc->status = -1;
1604 talloc_free(rc);
1605 return;
1608 talloc_free(rc);
1611 struct ctdb_revoke_state {
1612 struct ctdb_db_context *ctdb_db;
1613 TDB_DATA key;
1614 struct ctdb_ltdb_header *header;
1615 TDB_DATA data;
1616 int count;
1617 int status;
1618 int finished;
1621 static void update_record_cb(struct ctdb_client_control_state *state)
1623 struct ctdb_revoke_state *revoke_state;
1624 int ret;
1625 int32_t res;
1627 if (state == NULL) {
1628 return;
1630 revoke_state = state->async.private_data;
1632 state->async.fn = NULL;
1633 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1634 if ((ret != 0) || (res != 0)) {
1635 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1636 revoke_state->status = -1;
1639 revoke_state->count--;
1640 if (revoke_state->count <= 0) {
1641 revoke_state->finished = 1;
1645 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1647 struct ctdb_revoke_state *revoke_state = private_data;
1648 struct ctdb_client_control_state *state;
1650 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1651 if (state == NULL) {
1652 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1653 revoke_state->status = -1;
1654 return;
1656 state->async.fn = update_record_cb;
1657 state->async.private_data = revoke_state;
1659 revoke_state->count++;
1663 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1664 struct timeval yt, void *private_data)
1666 struct ctdb_revoke_state *state = private_data;
1668 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1669 state->finished = 1;
1670 state->status = -1;
1673 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1675 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1676 struct ctdb_ltdb_header new_header;
1677 TDB_DATA new_data;
1679 state->ctdb_db = ctdb_db;
1680 state->key = key;
1681 state->header = header;
1682 state->data = data;
1684 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1686 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1688 while (state->finished == 0) {
1689 event_loop_once(ctdb->ev);
1692 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1693 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1694 talloc_free(state);
1695 return -1;
1697 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1698 ctdb_ltdb_unlock(ctdb_db, key);
1699 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1700 talloc_free(state);
1701 return -1;
1703 header->rsn++;
1704 if (new_header.rsn > header->rsn) {
1705 ctdb_ltdb_unlock(ctdb_db, key);
1706 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1707 talloc_free(state);
1708 return -1;
1710 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1711 ctdb_ltdb_unlock(ctdb_db, key);
1712 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1713 talloc_free(state);
1714 return -1;
1718 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1719 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1721 if (state->status == 0) {
1722 new_header.rsn++;
1723 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1724 } else {
1725 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1726 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1728 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1729 ctdb_ltdb_unlock(ctdb_db, key);
1730 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1731 talloc_free(state);
1732 return -1;
1734 ctdb_ltdb_unlock(ctdb_db, key);
1736 talloc_free(state);
1737 return 0;
1741 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1743 TDB_DATA tdata;
1744 struct revokechild_handle *rc;
1745 pid_t parent = getpid();
1746 int ret;
1748 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1749 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1750 header->rsn -= 1;
1752 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1753 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1754 return -1;
1757 tdata = tdb_fetch(ctdb_db->rottdb, key);
1758 if (tdata.dsize > 0) {
1759 uint8_t *tmp;
1761 tmp = tdata.dptr;
1762 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1763 free(tmp);
1766 rc->status = 0;
1767 rc->ctdb = ctdb;
1768 rc->ctdb_db = ctdb_db;
1769 rc->fd[0] = -1;
1770 rc->fd[1] = -1;
1772 talloc_set_destructor(rc, revokechild_destructor);
1774 rc->key.dsize = key.dsize;
1775 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1776 if (rc->key.dptr == NULL) {
1777 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1778 talloc_free(rc);
1779 return -1;
1782 ret = pipe(rc->fd);
1783 if (ret != 0) {
1784 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1785 talloc_free(rc);
1786 return -1;
1790 rc->child = ctdb_fork(ctdb);
1791 if (rc->child == (pid_t)-1) {
1792 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1793 talloc_free(rc);
1794 return -1;
1797 if (rc->child == 0) {
1798 char c = 0;
1799 close(rc->fd[0]);
1800 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1802 ctdb_set_process_name("ctdb_revokechild");
1803 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1804 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1805 c = 1;
1806 goto child_finished;
1809 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1811 child_finished:
1812 sys_write(rc->fd[1], &c, 1);
1813 /* make sure we die when our parent dies */
1814 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1815 sleep(5);
1817 _exit(0);
1820 close(rc->fd[1]);
1821 rc->fd[1] = -1;
1822 set_close_on_exec(rc->fd[0]);
1824 /* This is an active revokechild child process */
1825 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1827 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1828 EVENT_FD_READ, revokechild_handler,
1829 (void *)rc);
1830 if (rc->fde == NULL) {
1831 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1832 talloc_free(rc);
1834 tevent_fd_set_auto_close(rc->fde);
1836 return 0;
1839 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1841 struct revokechild_handle *rc;
1842 struct revokechild_deferred_call *deferred_call;
1844 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1845 if (rc->key.dsize == 0) {
1846 continue;
1848 if (rc->key.dsize != key.dsize) {
1849 continue;
1851 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1852 break;
1856 if (rc == NULL) {
1857 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1858 return -1;
1861 deferred_call = talloc(rc, struct revokechild_deferred_call);
1862 if (deferred_call == NULL) {
1863 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1864 return -1;
1867 deferred_call->ctdb = ctdb;
1868 deferred_call->hdr = hdr;
1869 deferred_call->fn = fn;
1870 deferred_call->ctx = call_context;
1872 talloc_set_destructor(deferred_call, deferred_call_destructor);
1873 talloc_steal(deferred_call, hdr);
1875 return 0;