server: fix wording and punctuation in comment block for ctdb_reply_dmaster().
[Samba.git] / ctdb / server / ctdb_call.c
bloba4cab3fb252f2fcdf162a4f2d428f842fcffa7de
1 /*
2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record {
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
34 TDB_CONTEXT *pindown;
38 find the ctdb_db from a db index
40 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 struct ctdb_db_context *ctdb_db;
44 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45 if (ctdb_db->db_id == id) {
46 break;
49 return ctdb_db;
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58 ctdb_input_pkt(ctdb, hdr);
63 send an error reply
65 static void ctdb_send_error(struct ctdb_context *ctdb,
66 struct ctdb_req_header *hdr, uint32_t status,
67 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb,
69 struct ctdb_req_header *hdr, uint32_t status,
70 const char *fmt, ...)
72 va_list ap;
73 struct ctdb_reply_error *r;
74 char *msg;
75 int msglen, len;
77 if (ctdb->methods == NULL) {
78 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
79 return;
82 va_start(ap, fmt);
83 msg = talloc_vasprintf(ctdb, fmt, ap);
84 if (msg == NULL) {
85 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
87 va_end(ap);
89 msglen = strlen(msg)+1;
90 len = offsetof(struct ctdb_reply_error, msg);
91 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
92 struct ctdb_reply_error);
93 CTDB_NO_MEMORY_FATAL(ctdb, r);
95 r->hdr.destnode = hdr->srcnode;
96 r->hdr.reqid = hdr->reqid;
97 r->status = status;
98 r->msglen = msglen;
99 memcpy(&r->msg[0], msg, msglen);
101 ctdb_queue_packet(ctdb, &r->hdr);
103 talloc_free(msg);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
121 struct ctdb_db_context *ctdb_db,
122 TDB_DATA key,
123 struct ctdb_req_call *c,
124 struct ctdb_ltdb_header *header)
126 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
128 c->hdr.destnode = lmaster;
129 if (ctdb->pnn == lmaster) {
130 c->hdr.destnode = header->dmaster;
132 c->hopcount++;
134 if (c->hopcount%100 > 95) {
135 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
136 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
139 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
140 header->dmaster, c->hdr.destnode));
143 ctdb_queue_packet(ctdb, &c->hdr);
148 send a dmaster reply
150 caller must have the chainlock before calling this routine. Caller must be
151 the lmaster
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
154 struct ctdb_ltdb_header *header,
155 TDB_DATA key, TDB_DATA data,
156 uint32_t new_dmaster,
157 uint32_t reqid)
159 struct ctdb_context *ctdb = ctdb_db->ctdb;
160 struct ctdb_reply_dmaster *r;
161 int ret, len;
162 TALLOC_CTX *tmp_ctx;
164 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
165 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
166 return;
169 header->dmaster = new_dmaster;
170 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
171 if (ret != 0) {
172 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
173 return;
176 if (ctdb->methods == NULL) {
177 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
178 return;
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx = talloc_new(ctdb);
185 /* send the CTDB_REPLY_DMASTER */
186 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
187 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
188 struct ctdb_reply_dmaster);
189 CTDB_NO_MEMORY_FATAL(ctdb, r);
191 r->hdr.destnode = new_dmaster;
192 r->hdr.reqid = reqid;
193 r->rsn = header->rsn;
194 r->keylen = key.dsize;
195 r->datalen = data.dsize;
196 r->db_id = ctdb_db->db_id;
197 memcpy(&r->data[0], key.dptr, key.dsize);
198 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
199 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
201 ctdb_queue_packet(ctdb, &r->hdr);
203 talloc_free(tmp_ctx);
207 send a dmaster request (give another node the dmaster for a record)
209 This is always sent to the lmaster, which ensures that the lmaster
210 always knows who the dmaster is. The lmaster will then send a
211 CTDB_REPLY_DMASTER to the new dmaster
213 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
214 struct ctdb_req_call *c,
215 struct ctdb_ltdb_header *header,
216 TDB_DATA *key, TDB_DATA *data)
218 struct ctdb_req_dmaster *r;
219 struct ctdb_context *ctdb = ctdb_db->ctdb;
220 int len;
221 uint32_t lmaster = ctdb_lmaster(ctdb, key);
223 if (ctdb->methods == NULL) {
224 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
225 return;
228 if (data->dsize != 0) {
229 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
232 if (lmaster == ctdb->pnn) {
233 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
234 c->hdr.srcnode, c->hdr.reqid);
235 return;
238 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
239 + sizeof(uint32_t);
240 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
241 struct ctdb_req_dmaster);
242 CTDB_NO_MEMORY_FATAL(ctdb, r);
243 r->hdr.destnode = lmaster;
244 r->hdr.reqid = c->hdr.reqid;
245 r->db_id = c->db_id;
246 r->rsn = header->rsn;
247 r->dmaster = c->hdr.srcnode;
248 r->keylen = key->dsize;
249 r->datalen = data->dsize;
250 memcpy(&r->data[0], key->dptr, key->dsize);
251 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
252 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
254 header->dmaster = c->hdr.srcnode;
255 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
256 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
259 ctdb_queue_packet(ctdb, &r->hdr);
261 talloc_free(r);
264 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
265 struct timeval t, void *private_data)
267 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
268 struct ctdb_sticky_record);
270 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
271 if (sr->pindown != NULL) {
272 talloc_free(sr->pindown);
273 sr->pindown = NULL;
277 static int
278 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
280 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
281 uint32_t *k;
282 struct ctdb_sticky_record *sr;
284 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
285 if (k == NULL) {
286 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
287 talloc_free(tmp_ctx);
288 return -1;
291 k[0] = (key.dsize + 3) / 4 + 1;
292 memcpy(&k[1], key.dptr, key.dsize);
294 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
295 if (sr == NULL) {
296 talloc_free(tmp_ctx);
297 return 0;
300 talloc_free(tmp_ctx);
302 if (sr->pindown == NULL) {
303 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
304 sr->pindown = talloc_new(sr);
305 if (sr->pindown == NULL) {
306 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
307 return -1;
309 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
312 return 0;
316 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
319 must be called with the chainlock held. This function releases the chainlock
321 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
322 struct ctdb_req_header *hdr,
323 TDB_DATA key, TDB_DATA data,
324 uint64_t rsn, uint32_t record_flags)
326 struct ctdb_call_state *state;
327 struct ctdb_context *ctdb = ctdb_db->ctdb;
328 struct ctdb_ltdb_header header;
329 int ret;
331 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
333 ZERO_STRUCT(header);
334 header.rsn = rsn;
335 header.dmaster = ctdb->pnn;
336 header.flags = record_flags;
338 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
340 if (state) {
341 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
343 * We temporarily add the VACUUM_MIGRATED flag to
344 * the record flags, so that ctdb_ltdb_store can
345 * decide whether the record should be stored or
346 * deleted.
348 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
352 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
353 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
355 ret = ctdb_ltdb_unlock(ctdb_db, key);
356 if (ret != 0) {
357 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
359 return;
362 /* we just became DMASTER and this database is "sticky",
363 see if the record is flagged as "hot" and set up a pin-down
364 context to stop migrations for a little while if so
366 if (ctdb_db->sticky) {
367 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
370 if (state == NULL) {
371 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372 ctdb->pnn, hdr->reqid, hdr->srcnode));
374 ret = ctdb_ltdb_unlock(ctdb_db, key);
375 if (ret != 0) {
376 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
378 return;
381 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
382 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
384 ret = ctdb_ltdb_unlock(ctdb_db, key);
385 if (ret != 0) {
386 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
388 return;
391 if (hdr->reqid != state->reqid) {
392 /* we found a record but it was the wrong one */
393 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
395 ret = ctdb_ltdb_unlock(ctdb_db, key);
396 if (ret != 0) {
397 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
399 return;
402 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn);
404 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
405 if (ret != 0) {
406 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 state->state = CTDB_CALL_DONE;
410 if (state->async.fn) {
411 state->async.fn(state);
418 called when a CTDB_REQ_DMASTER packet comes in
420 this comes into the lmaster for a record when the current dmaster
421 wants to give up the dmaster role and give it to someone else
423 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
425 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
426 TDB_DATA key, data, data2;
427 struct ctdb_ltdb_header header;
428 struct ctdb_db_context *ctdb_db;
429 uint32_t record_flags = 0;
430 size_t len;
431 int ret;
433 key.dptr = c->data;
434 key.dsize = c->keylen;
435 data.dptr = c->data + c->keylen;
436 data.dsize = c->datalen;
437 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
438 + sizeof(uint32_t);
439 if (len <= c->hdr.length) {
440 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
443 ctdb_db = find_ctdb_db(ctdb, c->db_id);
444 if (!ctdb_db) {
445 ctdb_send_error(ctdb, hdr, -1,
446 "Unknown database in request. db_id==0x%08x",
447 c->db_id);
448 return;
451 /* fetch the current record */
452 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
453 ctdb_call_input_pkt, ctdb, false);
454 if (ret == -1) {
455 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
456 return;
458 if (ret == -2) {
459 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
460 return;
463 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
464 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
465 ctdb->pnn, ctdb_lmaster(ctdb, &key),
466 hdr->generation, ctdb->vnn_map->generation));
467 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
470 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
471 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
473 /* its a protocol error if the sending node is not the current dmaster */
474 if (header.dmaster != hdr->srcnode) {
475 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
476 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
477 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
478 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
479 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
480 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
481 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
483 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
484 ctdb_ltdb_unlock(ctdb_db, key);
485 return;
489 if (header.rsn > c->rsn) {
490 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
491 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
492 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
493 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
496 /* use the rsn from the sending node */
497 header.rsn = c->rsn;
499 /* store the record flags from the sending node */
500 header.flags = record_flags;
502 /* check if the new dmaster is the lmaster, in which case we
503 skip the dmaster reply */
504 if (c->dmaster == ctdb->pnn) {
505 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
506 } else {
507 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
509 ret = ctdb_ltdb_unlock(ctdb_db, key);
510 if (ret != 0) {
511 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
516 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
517 struct timeval t, void *private_data)
519 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
520 struct ctdb_sticky_record);
521 talloc_free(sr);
524 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
526 if (data) {
527 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
528 talloc_free(data);
530 return parm;
533 static int
534 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
536 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
537 uint32_t *k;
538 struct ctdb_sticky_record *sr;
540 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
541 if (k == NULL) {
542 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
543 talloc_free(tmp_ctx);
544 return -1;
547 k[0] = (key.dsize + 3) / 4 + 1;
548 memcpy(&k[1], key.dptr, key.dsize);
550 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
551 if (sr != NULL) {
552 talloc_free(tmp_ctx);
553 return 0;
556 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
557 if (sr == NULL) {
558 talloc_free(tmp_ctx);
559 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
560 return -1;
563 sr->ctdb = ctdb;
564 sr->ctdb_db = ctdb_db;
565 sr->pindown = NULL;
567 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
568 ctdb->tunable.sticky_duration,
569 ctdb_db->db_name, ctdb_hash(&key)));
571 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
573 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
575 talloc_free(tmp_ctx);
576 return 0;
579 struct pinned_down_requeue_handle {
580 struct ctdb_context *ctdb;
581 struct ctdb_req_header *hdr;
584 struct pinned_down_deferred_call {
585 struct ctdb_context *ctdb;
586 struct ctdb_req_header *hdr;
589 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
590 struct timeval t, void *private_data)
592 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
593 struct ctdb_context *ctdb = handle->ctdb;
595 talloc_steal(ctdb, handle->hdr);
596 ctdb_call_input_pkt(ctdb, handle->hdr);
598 talloc_free(handle);
601 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
603 struct ctdb_context *ctdb = pinned_down->ctdb;
604 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
606 handle->ctdb = pinned_down->ctdb;
607 handle->hdr = pinned_down->hdr;
608 talloc_steal(handle, handle->hdr);
610 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
612 return 0;
615 static int
616 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
618 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
619 uint32_t *k;
620 struct ctdb_sticky_record *sr;
621 struct pinned_down_deferred_call *pinned_down;
623 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
624 if (k == NULL) {
625 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
626 talloc_free(tmp_ctx);
627 return -1;
630 k[0] = (key.dsize + 3) / 4 + 1;
631 memcpy(&k[1], key.dptr, key.dsize);
633 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
634 if (sr == NULL) {
635 talloc_free(tmp_ctx);
636 return -1;
639 talloc_free(tmp_ctx);
641 if (sr->pindown == NULL) {
642 return -1;
645 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
646 if (pinned_down == NULL) {
647 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
648 return -1;
651 pinned_down->ctdb = ctdb;
652 pinned_down->hdr = hdr;
654 talloc_set_destructor(pinned_down, pinned_down_destructor);
655 talloc_steal(pinned_down, hdr);
657 return 0;
660 static void
661 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
663 int i, id;
665 /* smallest value is always at index 0 */
666 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
667 return;
670 /* see if we already know this key */
671 for (i = 0; i < MAX_HOT_KEYS; i++) {
672 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
673 continue;
675 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
676 continue;
678 /* found an entry for this key */
679 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
680 return;
682 ctdb_db->statistics.hot_keys[i].count = hopcount;
683 goto sort_keys;
686 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
687 id = ctdb_db->statistics.num_hot_keys;
688 ctdb_db->statistics.num_hot_keys++;
689 } else {
690 id = 0;
693 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
694 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
696 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
697 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
698 ctdb_db->statistics.hot_keys[id].count = hopcount;
700 sort_keys:
701 for (i = 1; i < MAX_HOT_KEYS; i++) {
702 if (ctdb_db->statistics.hot_keys[i].count == 0) {
703 continue;
705 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
706 hopcount = ctdb_db->statistics.hot_keys[i].count;
707 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
708 ctdb_db->statistics.hot_keys[0].count = hopcount;
710 key = ctdb_db->statistics.hot_keys[i].key;
711 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
712 ctdb_db->statistics.hot_keys[0].key = key;
718 called when a CTDB_REQ_CALL packet comes in
720 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
722 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
723 TDB_DATA data;
724 struct ctdb_reply_call *r;
725 int ret, len;
726 struct ctdb_ltdb_header header;
727 struct ctdb_call *call;
728 struct ctdb_db_context *ctdb_db;
729 int tmp_count, bucket;
731 if (ctdb->methods == NULL) {
732 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
733 return;
737 ctdb_db = find_ctdb_db(ctdb, c->db_id);
738 if (!ctdb_db) {
739 ctdb_send_error(ctdb, hdr, -1,
740 "Unknown database in request. db_id==0x%08x",
741 c->db_id);
742 return;
745 call = talloc(hdr, struct ctdb_call);
746 CTDB_NO_MEMORY_FATAL(ctdb, call);
748 call->call_id = c->callid;
749 call->key.dptr = c->data;
750 call->key.dsize = c->keylen;
751 call->call_data.dptr = c->data + c->keylen;
752 call->call_data.dsize = c->calldatalen;
753 call->reply_data.dptr = NULL;
754 call->reply_data.dsize = 0;
757 /* If this record is pinned down we should defer the
758 request until the pindown times out
760 if (ctdb_db->sticky) {
761 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
762 DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
763 return;
768 /* determine if we are the dmaster for this key. This also
769 fetches the record data (if any), thus avoiding a 2nd fetch of the data
770 if the call will be answered locally */
772 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
773 ctdb_call_input_pkt, ctdb, false);
774 if (ret == -1) {
775 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
776 return;
778 if (ret == -2) {
779 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
780 return;
783 /* Dont do READONLY if we dont have a tracking database */
784 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
785 c->flags &= ~CTDB_WANT_READONLY;
788 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
789 header.flags &= ~CTDB_REC_RO_FLAGS;
790 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
791 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
792 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
793 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
795 /* and clear out the tracking data */
796 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
797 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
801 /* if we are revoking, we must defer all other calls until the revoke
802 * had completed.
804 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
805 talloc_free(data.dptr);
806 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
808 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
809 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
811 talloc_free(call);
812 return;
816 * If we are not the dmaster and are not hosting any delegations,
817 * then we redirect the request to the node than can answer it
818 * (the lmaster or the dmaster).
820 if ((header.dmaster != ctdb->pnn)
821 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
822 talloc_free(data.dptr);
823 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
825 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
826 if (ret != 0) {
827 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
829 return;
832 if ( (!(c->flags & CTDB_WANT_READONLY))
833 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
834 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
835 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
836 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
838 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
840 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
841 ctdb_fatal(ctdb, "Failed to start record revoke");
843 talloc_free(data.dptr);
845 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
846 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
848 talloc_free(call);
850 return;
853 /* If this is the first request for delegation. bump rsn and set
854 * the delegations flag
856 if ((c->flags & CTDB_WANT_READONLY)
857 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
858 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
859 header.rsn += 3;
860 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
861 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
862 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
865 if ((c->flags & CTDB_WANT_READONLY)
866 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
867 TDB_DATA tdata;
869 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
870 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
871 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
873 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
874 ctdb_fatal(ctdb, "Failed to store trackingdb data");
876 free(tdata.dptr);
878 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
879 if (ret != 0) {
880 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
883 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
884 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
885 struct ctdb_reply_call);
886 CTDB_NO_MEMORY_FATAL(ctdb, r);
887 r->hdr.destnode = c->hdr.srcnode;
888 r->hdr.reqid = c->hdr.reqid;
889 r->status = 0;
890 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
891 header.rsn -= 2;
892 header.flags |= CTDB_REC_RO_HAVE_READONLY;
893 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
894 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
896 if (data.dsize) {
897 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
900 ctdb_queue_packet(ctdb, &r->hdr);
901 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
902 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
904 talloc_free(r);
905 return;
908 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
909 tmp_count = c->hopcount;
910 bucket = 0;
911 while (tmp_count) {
912 tmp_count >>= 2;
913 bucket++;
915 if (bucket >= MAX_COUNT_BUCKETS) {
916 bucket = MAX_COUNT_BUCKETS - 1;
918 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
919 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
920 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
922 /* If this database supports sticky records, then check if the
923 hopcount is big. If it is it means the record is hot and we
924 should make it sticky.
926 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
927 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
931 /* if this nodes has done enough consecutive calls on the same record
932 then give them the record
933 or if the node requested an immediate migration
935 if ( c->hdr.srcnode != ctdb->pnn &&
936 ((header.laccessor == c->hdr.srcnode
937 && header.lacount >= ctdb->tunable.max_lacount
938 && ctdb->tunable.max_lacount != 0)
939 || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
940 if (ctdb_db->transaction_active) {
941 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
942 " of key %s while transaction is active\n",
943 (char *)call->key.dptr));
944 } else {
945 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
946 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
947 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
948 talloc_free(data.dptr);
950 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
951 if (ret != 0) {
952 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
954 return;
958 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode);
959 if (ret != 0) {
960 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
961 call->status = -1;
964 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
965 if (ret != 0) {
966 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
969 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
970 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
971 struct ctdb_reply_call);
972 CTDB_NO_MEMORY_FATAL(ctdb, r);
973 r->hdr.destnode = hdr->srcnode;
974 r->hdr.reqid = hdr->reqid;
975 r->status = call->status;
976 r->datalen = call->reply_data.dsize;
977 if (call->reply_data.dsize) {
978 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
981 ctdb_queue_packet(ctdb, &r->hdr);
983 talloc_free(r);
987 called when a CTDB_REPLY_CALL packet comes in
989 This packet comes in response to a CTDB_REQ_CALL request packet. It
990 contains any reply data from the call
992 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
994 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
995 struct ctdb_call_state *state;
997 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
998 if (state == NULL) {
999 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1000 return;
1003 if (hdr->reqid != state->reqid) {
1004 /* we found a record but it was the wrong one */
1005 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1006 return;
1010 /* read only delegation processing */
1011 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1012 * delegation since we may need to update the record header
1014 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1015 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1016 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1017 struct ctdb_ltdb_header oldheader;
1018 TDB_DATA key, data, olddata;
1019 int ret;
1021 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1022 goto finished_ro;
1023 return;
1026 key.dsize = state->c->keylen;
1027 key.dptr = state->c->data;
1028 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1029 ctdb_call_input_pkt, ctdb, false);
1030 if (ret == -2) {
1031 return;
1033 if (ret != 0) {
1034 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1035 return;
1038 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1039 if (ret != 0) {
1040 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1041 ctdb_ltdb_unlock(ctdb_db, key);
1042 goto finished_ro;
1045 if (header->rsn <= oldheader.rsn) {
1046 ctdb_ltdb_unlock(ctdb_db, key);
1047 goto finished_ro;
1050 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1051 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1052 ctdb_ltdb_unlock(ctdb_db, key);
1053 goto finished_ro;
1056 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1057 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1058 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1059 if (ret != 0) {
1060 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1061 ctdb_ltdb_unlock(ctdb_db, key);
1062 goto finished_ro;
1065 ctdb_ltdb_unlock(ctdb_db, key);
1067 finished_ro:
1069 state->call->reply_data.dptr = c->data;
1070 state->call->reply_data.dsize = c->datalen;
1071 state->call->status = c->status;
1073 talloc_steal(state, c);
1075 state->state = CTDB_CALL_DONE;
1076 if (state->async.fn) {
1077 state->async.fn(state);
1083 called when a CTDB_REPLY_DMASTER packet comes in
1085 This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1086 request packet. It means that the current dmaster wants to give us
1087 the dmaster role.
1089 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1091 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1092 struct ctdb_db_context *ctdb_db;
1093 TDB_DATA key, data;
1094 uint32_t record_flags = 0;
1095 size_t len;
1096 int ret;
1098 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1099 if (ctdb_db == NULL) {
1100 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1101 return;
1104 key.dptr = c->data;
1105 key.dsize = c->keylen;
1106 data.dptr = &c->data[key.dsize];
1107 data.dsize = c->datalen;
1108 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1109 + sizeof(uint32_t);
1110 if (len <= c->hdr.length) {
1111 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
1114 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1115 ctdb_call_input_pkt, ctdb, false);
1116 if (ret == -2) {
1117 return;
1119 if (ret != 0) {
1120 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1121 return;
1124 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1129 called when a CTDB_REPLY_ERROR packet comes in
1131 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1133 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1134 struct ctdb_call_state *state;
1136 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1137 if (state == NULL) {
1138 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1139 ctdb->pnn, hdr->reqid));
1140 return;
1143 if (hdr->reqid != state->reqid) {
1144 /* we found a record but it was the wrong one */
1145 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1146 return;
1149 talloc_steal(state, c);
1151 state->state = CTDB_CALL_ERROR;
1152 state->errmsg = (char *)c->msg;
1153 if (state->async.fn) {
1154 state->async.fn(state);
1160 destroy a ctdb_call
1162 static int ctdb_call_destructor(struct ctdb_call_state *state)
1164 DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1165 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1166 return 0;
1171 called when a ctdb_call needs to be resent after a reconfigure event
1173 static void ctdb_call_resend(struct ctdb_call_state *state)
1175 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1177 state->generation = ctdb->vnn_map->generation;
1179 /* use a new reqid, in case the old reply does eventually come in */
1180 ctdb_reqid_remove(ctdb, state->reqid);
1181 state->reqid = ctdb_reqid_new(ctdb, state);
1182 state->c->hdr.reqid = state->reqid;
1184 /* update the generation count for this request, so its valid with the new vnn_map */
1185 state->c->hdr.generation = state->generation;
1187 /* send the packet to ourselves, it will be redirected appropriately */
1188 state->c->hdr.destnode = ctdb->pnn;
1190 ctdb_queue_packet(ctdb, &state->c->hdr);
1191 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1195 resend all pending calls on recovery
1197 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1199 struct ctdb_call_state *state, *next;
1200 for (state=ctdb->pending_calls;state;state=next) {
1201 next = state->next;
1202 ctdb_call_resend(state);
1207 this allows the caller to setup a async.fn
1209 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1210 struct timeval t, void *private_data)
1212 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1213 if (state->async.fn) {
1214 state->async.fn(state);
1220 construct an event driven local ctdb_call
1222 this is used so that locally processed ctdb_call requests are processed
1223 in an event driven manner
1225 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1226 struct ctdb_call *call,
1227 struct ctdb_ltdb_header *header,
1228 TDB_DATA *data)
1230 struct ctdb_call_state *state;
1231 struct ctdb_context *ctdb = ctdb_db->ctdb;
1232 int ret;
1234 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1235 CTDB_NO_MEMORY_NULL(ctdb, state);
1237 talloc_steal(state, data->dptr);
1239 state->state = CTDB_CALL_DONE;
1240 state->call = talloc(state, struct ctdb_call);
1241 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1242 *(state->call) = *call;
1243 state->ctdb_db = ctdb_db;
1245 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
1246 if (ret != 0) {
1247 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1250 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1252 return state;
1257 make a remote ctdb call - async send. Called in daemon context.
1259 This constructs a ctdb_call request and queues it for processing.
1260 This call never blocks.
1262 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1263 struct ctdb_call *call,
1264 struct ctdb_ltdb_header *header)
1266 uint32_t len;
1267 struct ctdb_call_state *state;
1268 struct ctdb_context *ctdb = ctdb_db->ctdb;
1270 if (ctdb->methods == NULL) {
1271 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1272 return NULL;
1275 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1276 CTDB_NO_MEMORY_NULL(ctdb, state);
1277 state->call = talloc(state, struct ctdb_call);
1278 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1280 state->reqid = ctdb_reqid_new(ctdb, state);
1281 state->ctdb_db = ctdb_db;
1282 talloc_set_destructor(state, ctdb_call_destructor);
1284 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1285 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1286 struct ctdb_req_call);
1287 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1288 state->c->hdr.destnode = header->dmaster;
1290 /* this limits us to 16k outstanding messages - not unreasonable */
1291 state->c->hdr.reqid = state->reqid;
1292 state->c->flags = call->flags;
1293 state->c->db_id = ctdb_db->db_id;
1294 state->c->callid = call->call_id;
1295 state->c->hopcount = 0;
1296 state->c->keylen = call->key.dsize;
1297 state->c->calldatalen = call->call_data.dsize;
1298 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1299 memcpy(&state->c->data[call->key.dsize],
1300 call->call_data.dptr, call->call_data.dsize);
1301 *(state->call) = *call;
1302 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1303 state->call->key.dptr = &state->c->data[0];
1305 state->state = CTDB_CALL_WAIT;
1306 state->generation = ctdb->vnn_map->generation;
1308 DLIST_ADD(ctdb->pending_calls, state);
1310 ctdb_queue_packet(ctdb, &state->c->hdr);
1312 return state;
1316 make a remote ctdb call - async recv - called in daemon context
1318 This is called when the program wants to wait for a ctdb_call to complete and get the
1319 results. This call will block unless the call has already completed.
1321 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1323 while (state->state < CTDB_CALL_DONE) {
1324 event_loop_once(state->ctdb_db->ctdb->ev);
1326 if (state->state != CTDB_CALL_DONE) {
1327 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1328 talloc_free(state);
1329 return -1;
1332 if (state->call->reply_data.dsize) {
1333 call->reply_data.dptr = talloc_memdup(call,
1334 state->call->reply_data.dptr,
1335 state->call->reply_data.dsize);
1336 call->reply_data.dsize = state->call->reply_data.dsize;
1337 } else {
1338 call->reply_data.dptr = NULL;
1339 call->reply_data.dsize = 0;
1341 call->status = state->call->status;
1342 talloc_free(state);
1343 return 0;
1348 send a keepalive packet to the other node
1350 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1352 struct ctdb_req_keepalive *r;
1354 if (ctdb->methods == NULL) {
1355 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1356 return;
1359 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1360 sizeof(struct ctdb_req_keepalive),
1361 struct ctdb_req_keepalive);
1362 CTDB_NO_MEMORY_FATAL(ctdb, r);
1363 r->hdr.destnode = destnode;
1364 r->hdr.reqid = 0;
1366 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1368 ctdb_queue_packet(ctdb, &r->hdr);
1370 talloc_free(r);
1375 struct revokechild_deferred_call {
1376 struct ctdb_context *ctdb;
1377 struct ctdb_req_header *hdr;
1378 deferred_requeue_fn fn;
1379 void *ctx;
1382 struct revokechild_handle {
1383 struct revokechild_handle *next, *prev;
1384 struct ctdb_context *ctdb;
1385 struct ctdb_db_context *ctdb_db;
1386 struct fd_event *fde;
1387 int status;
1388 int fd[2];
1389 pid_t child;
1390 TDB_DATA key;
1393 struct revokechild_requeue_handle {
1394 struct ctdb_context *ctdb;
1395 struct ctdb_req_header *hdr;
1396 deferred_requeue_fn fn;
1397 void *ctx;
1400 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1401 struct timeval t, void *private_data)
1403 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1405 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1406 talloc_free(requeue_handle);
1409 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1411 struct ctdb_context *ctdb = deferred_call->ctdb;
1412 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1413 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1415 requeue_handle->ctdb = ctdb;
1416 requeue_handle->hdr = deferred_call->hdr;
1417 requeue_handle->fn = deferred_call->fn;
1418 requeue_handle->ctx = deferred_call->ctx;
1419 talloc_steal(requeue_handle, requeue_handle->hdr);
1421 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1422 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1424 return 0;
1428 static int revokechild_destructor(struct revokechild_handle *rc)
1430 if (rc->fde != NULL) {
1431 talloc_free(rc->fde);
1434 if (rc->fd[0] != -1) {
1435 close(rc->fd[0]);
1437 if (rc->fd[1] != -1) {
1438 close(rc->fd[1]);
1440 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1442 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1443 return 0;
1446 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1447 uint16_t flags, void *private_data)
1449 struct revokechild_handle *rc = talloc_get_type(private_data,
1450 struct revokechild_handle);
1451 int ret;
1452 char c;
1454 ret = read(rc->fd[0], &c, 1);
1455 if (ret != 1) {
1456 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1457 rc->status = -1;
1458 talloc_free(rc);
1459 return;
1461 if (c != 0) {
1462 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1463 rc->status = -1;
1464 talloc_free(rc);
1465 return;
1468 talloc_free(rc);
1471 struct ctdb_revoke_state {
1472 struct ctdb_db_context *ctdb_db;
1473 TDB_DATA key;
1474 struct ctdb_ltdb_header *header;
1475 TDB_DATA data;
1476 int count;
1477 int status;
1478 int finished;
1481 static void update_record_cb(struct ctdb_client_control_state *state)
1483 struct ctdb_revoke_state *revoke_state;
1484 int ret;
1485 int32_t res;
1487 if (state == NULL) {
1488 return;
1490 revoke_state = state->async.private_data;
1492 state->async.fn = NULL;
1493 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1494 if ((ret != 0) || (res != 0)) {
1495 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1496 revoke_state->status = -1;
1499 revoke_state->count--;
1500 if (revoke_state->count <= 0) {
1501 revoke_state->finished = 1;
1505 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1507 struct ctdb_revoke_state *revoke_state = private_data;
1508 struct ctdb_client_control_state *state;
1510 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1511 if (state == NULL) {
1512 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1513 revoke_state->status = -1;
1514 return;
1516 state->async.fn = update_record_cb;
1517 state->async.private_data = revoke_state;
1519 revoke_state->count++;
1523 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1524 struct timeval yt, void *private_data)
1526 struct ctdb_revoke_state *state = private_data;
1528 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1529 state->finished = 1;
1530 state->status = -1;
1533 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1535 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1536 int status;
1538 state->ctdb_db = ctdb_db;
1539 state->key = key;
1540 state->header = header;
1541 state->data = data;
1543 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1545 event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
1547 while (state->finished == 0) {
1548 event_loop_once(ctdb->ev);
1551 status = state->status;
1553 if (status == 0) {
1554 struct ctdb_ltdb_header new_header;
1555 TDB_DATA new_data;
1557 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1558 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1559 talloc_free(state);
1560 return -1;
1562 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1563 ctdb_ltdb_unlock(ctdb_db, key);
1564 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1565 talloc_free(state);
1566 return -1;
1568 header->rsn++;
1569 if (new_header.rsn > header->rsn) {
1570 ctdb_ltdb_unlock(ctdb_db, key);
1571 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1572 talloc_free(state);
1573 return -1;
1575 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1576 ctdb_ltdb_unlock(ctdb_db, key);
1577 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1578 talloc_free(state);
1579 return -1;
1581 new_header.rsn++;
1582 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1583 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1584 ctdb_ltdb_unlock(ctdb_db, key);
1585 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1586 talloc_free(state);
1587 return -1;
1589 ctdb_ltdb_unlock(ctdb_db, key);
1592 talloc_free(state);
1593 return status;
1597 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1599 TDB_DATA tdata;
1600 struct revokechild_handle *rc;
1601 pid_t parent = getpid();
1602 int ret;
1604 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1605 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1606 header->rsn -= 1;
1608 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1609 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1610 return -1;
1613 tdata = tdb_fetch(ctdb_db->rottdb, key);
1614 if (tdata.dsize > 0) {
1615 uint8_t *tmp;
1617 tmp = tdata.dptr;
1618 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1619 free(tmp);
1622 rc->status = 0;
1623 rc->ctdb = ctdb;
1624 rc->ctdb_db = ctdb_db;
1625 rc->fd[0] = -1;
1626 rc->fd[1] = -1;
1628 talloc_set_destructor(rc, revokechild_destructor);
1630 rc->key.dsize = key.dsize;
1631 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1632 if (rc->key.dptr == NULL) {
1633 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1634 talloc_free(rc);
1635 return -1;
1638 ret = pipe(rc->fd);
1639 if (ret != 0) {
1640 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1641 talloc_free(rc);
1642 return -1;
1646 rc->child = ctdb_fork(ctdb);
1647 if (rc->child == (pid_t)-1) {
1648 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1649 talloc_free(rc);
1650 return -1;
1653 if (rc->child == 0) {
1654 char c = 0;
1655 close(rc->fd[0]);
1656 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1658 ctdb_set_process_name("ctdb_revokechild");
1659 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1660 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1661 c = 1;
1662 goto child_finished;
1665 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1667 child_finished:
1668 write(rc->fd[1], &c, 1);
1669 /* make sure we die when our parent dies */
1670 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1671 sleep(5);
1673 _exit(0);
1676 close(rc->fd[1]);
1677 rc->fd[1] = -1;
1678 set_close_on_exec(rc->fd[0]);
1680 /* This is an active revokechild child process */
1681 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1683 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1684 EVENT_FD_READ, revokechild_handler,
1685 (void *)rc);
1686 if (rc->fde == NULL) {
1687 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1688 talloc_free(rc);
1690 tevent_fd_set_auto_close(rc->fde);
1692 return 0;
1695 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1697 struct revokechild_handle *rc;
1698 struct revokechild_deferred_call *deferred_call;
1700 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1701 if (rc->key.dsize == 0) {
1702 continue;
1704 if (rc->key.dsize != key.dsize) {
1705 continue;
1707 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1708 break;
1712 if (rc == NULL) {
1713 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1714 return -1;
1717 deferred_call = talloc(rc, struct revokechild_deferred_call);
1718 if (deferred_call == NULL) {
1719 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1720 return -1;
1723 deferred_call->ctdb = ctdb;
1724 deferred_call->hdr = hdr;
1725 deferred_call->fn = fn;
1726 deferred_call->ctx = call_context;
1728 talloc_set_destructor(deferred_call, deferred_call_destructor);
1729 talloc_steal(deferred_call, hdr);
1731 return 0;