lib: Fix blank line endings
[Samba.git] / ctdb / server / ctdb_call.c
blob1830b4a7c5b9b0f22ff76bb35dd88f3dcf75a434
1 /*
2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record {
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
34 TDB_CONTEXT *pindown;
38 find the ctdb_db from a db index
40 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 struct ctdb_db_context *ctdb_db;
44 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45 if (ctdb_db->db_id == id) {
46 break;
49 return ctdb_db;
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58 ctdb_input_pkt(ctdb, hdr);
63 send an error reply
65 static void ctdb_send_error(struct ctdb_context *ctdb,
66 struct ctdb_req_header *hdr, uint32_t status,
67 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb,
69 struct ctdb_req_header *hdr, uint32_t status,
70 const char *fmt, ...)
72 va_list ap;
73 struct ctdb_reply_error *r;
74 char *msg;
75 int msglen, len;
77 if (ctdb->methods == NULL) {
78 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
79 return;
82 va_start(ap, fmt);
83 msg = talloc_vasprintf(ctdb, fmt, ap);
84 if (msg == NULL) {
85 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
87 va_end(ap);
89 msglen = strlen(msg)+1;
90 len = offsetof(struct ctdb_reply_error, msg);
91 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
92 struct ctdb_reply_error);
93 CTDB_NO_MEMORY_FATAL(ctdb, r);
95 r->hdr.destnode = hdr->srcnode;
96 r->hdr.reqid = hdr->reqid;
97 r->status = status;
98 r->msglen = msglen;
99 memcpy(&r->msg[0], msg, msglen);
101 ctdb_queue_packet(ctdb, &r->hdr);
103 talloc_free(msg);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
121 struct ctdb_db_context *ctdb_db,
122 TDB_DATA key,
123 struct ctdb_req_call *c,
124 struct ctdb_ltdb_header *header)
126 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
128 c->hdr.destnode = lmaster;
129 if (ctdb->pnn == lmaster) {
130 c->hdr.destnode = header->dmaster;
132 c->hopcount++;
134 if (c->hopcount%100 > 95) {
135 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
136 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
139 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
140 header->dmaster, c->hdr.destnode));
143 ctdb_queue_packet(ctdb, &c->hdr);
148 send a dmaster reply
150 caller must have the chainlock before calling this routine. Caller must be
151 the lmaster
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
154 struct ctdb_ltdb_header *header,
155 TDB_DATA key, TDB_DATA data,
156 uint32_t new_dmaster,
157 uint32_t reqid)
159 struct ctdb_context *ctdb = ctdb_db->ctdb;
160 struct ctdb_reply_dmaster *r;
161 int ret, len;
162 TALLOC_CTX *tmp_ctx;
164 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
165 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
166 return;
169 header->dmaster = new_dmaster;
170 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
171 if (ret != 0) {
172 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
173 return;
176 if (ctdb->methods == NULL) {
177 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
178 return;
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx = talloc_new(ctdb);
185 /* send the CTDB_REPLY_DMASTER */
186 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
187 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
188 struct ctdb_reply_dmaster);
189 CTDB_NO_MEMORY_FATAL(ctdb, r);
191 r->hdr.destnode = new_dmaster;
192 r->hdr.reqid = reqid;
193 r->rsn = header->rsn;
194 r->keylen = key.dsize;
195 r->datalen = data.dsize;
196 r->db_id = ctdb_db->db_id;
197 memcpy(&r->data[0], key.dptr, key.dsize);
198 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
199 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
201 ctdb_queue_packet(ctdb, &r->hdr);
203 talloc_free(tmp_ctx);
207 send a dmaster request (give another node the dmaster for a record)
209 This is always sent to the lmaster, which ensures that the lmaster
210 always knows who the dmaster is. The lmaster will then send a
211 CTDB_REPLY_DMASTER to the new dmaster
213 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
214 struct ctdb_req_call *c,
215 struct ctdb_ltdb_header *header,
216 TDB_DATA *key, TDB_DATA *data)
218 struct ctdb_req_dmaster *r;
219 struct ctdb_context *ctdb = ctdb_db->ctdb;
220 int len;
221 uint32_t lmaster = ctdb_lmaster(ctdb, key);
223 if (ctdb->methods == NULL) {
224 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
225 return;
228 if (data->dsize != 0) {
229 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
232 if (lmaster == ctdb->pnn) {
233 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
234 c->hdr.srcnode, c->hdr.reqid);
235 return;
238 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
239 + sizeof(uint32_t);
240 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
241 struct ctdb_req_dmaster);
242 CTDB_NO_MEMORY_FATAL(ctdb, r);
243 r->hdr.destnode = lmaster;
244 r->hdr.reqid = c->hdr.reqid;
245 r->db_id = c->db_id;
246 r->rsn = header->rsn;
247 r->dmaster = c->hdr.srcnode;
248 r->keylen = key->dsize;
249 r->datalen = data->dsize;
250 memcpy(&r->data[0], key->dptr, key->dsize);
251 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
252 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
254 header->dmaster = c->hdr.srcnode;
255 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
256 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
259 ctdb_queue_packet(ctdb, &r->hdr);
261 talloc_free(r);
264 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
265 struct timeval t, void *private_data)
267 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
268 struct ctdb_sticky_record);
270 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
271 if (sr->pindown != NULL) {
272 talloc_free(sr->pindown);
273 sr->pindown = NULL;
277 static int
278 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
280 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
281 uint32_t *k;
282 struct ctdb_sticky_record *sr;
284 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
285 if (k == NULL) {
286 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
287 talloc_free(tmp_ctx);
288 return -1;
291 k[0] = (key.dsize + 3) / 4 + 1;
292 memcpy(&k[1], key.dptr, key.dsize);
294 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
295 if (sr == NULL) {
296 talloc_free(tmp_ctx);
297 return 0;
300 talloc_free(tmp_ctx);
302 if (sr->pindown == NULL) {
303 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
304 sr->pindown = talloc_new(sr);
305 if (sr->pindown == NULL) {
306 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
307 return -1;
309 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
312 return 0;
316 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
319 must be called with the chainlock held. This function releases the chainlock
321 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
322 struct ctdb_req_header *hdr,
323 TDB_DATA key, TDB_DATA data,
324 uint64_t rsn, uint32_t record_flags)
326 struct ctdb_call_state *state;
327 struct ctdb_context *ctdb = ctdb_db->ctdb;
328 struct ctdb_ltdb_header header;
329 int ret;
331 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
333 ZERO_STRUCT(header);
334 header.rsn = rsn;
335 header.dmaster = ctdb->pnn;
336 header.flags = record_flags;
338 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
340 if (state) {
341 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
343 * We temporarily add the VACUUM_MIGRATED flag to
344 * the record flags, so that ctdb_ltdb_store can
345 * decide whether the record should be stored or
346 * deleted.
348 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
352 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
353 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
355 ret = ctdb_ltdb_unlock(ctdb_db, key);
356 if (ret != 0) {
357 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
359 return;
362 /* we just became DMASTER and this database is "sticky",
363 see if the record is flagged as "hot" and set up a pin-down
364 context to stop migrations for a little while if so
366 if (ctdb_db->sticky) {
367 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
370 if (state == NULL) {
371 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372 ctdb->pnn, hdr->reqid, hdr->srcnode));
374 ret = ctdb_ltdb_unlock(ctdb_db, key);
375 if (ret != 0) {
376 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
378 return;
381 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
382 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
384 ret = ctdb_ltdb_unlock(ctdb_db, key);
385 if (ret != 0) {
386 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
388 return;
391 if (hdr->reqid != state->reqid) {
392 /* we found a record but it was the wrong one */
393 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
395 ret = ctdb_ltdb_unlock(ctdb_db, key);
396 if (ret != 0) {
397 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
399 return;
402 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
404 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
405 if (ret != 0) {
406 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 state->state = CTDB_CALL_DONE;
410 if (state->async.fn) {
411 state->async.fn(state);
418 called when a CTDB_REQ_DMASTER packet comes in
420 this comes into the lmaster for a record when the current dmaster
421 wants to give up the dmaster role and give it to someone else
423 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
425 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
426 TDB_DATA key, data, data2;
427 struct ctdb_ltdb_header header;
428 struct ctdb_db_context *ctdb_db;
429 uint32_t record_flags = 0;
430 size_t len;
431 int ret;
433 key.dptr = c->data;
434 key.dsize = c->keylen;
435 data.dptr = c->data + c->keylen;
436 data.dsize = c->datalen;
437 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
438 + sizeof(uint32_t);
439 if (len <= c->hdr.length) {
440 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
443 ctdb_db = find_ctdb_db(ctdb, c->db_id);
444 if (!ctdb_db) {
445 ctdb_send_error(ctdb, hdr, -1,
446 "Unknown database in request. db_id==0x%08x",
447 c->db_id);
448 return;
451 /* fetch the current record */
452 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
453 ctdb_call_input_pkt, ctdb, false);
454 if (ret == -1) {
455 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
456 return;
458 if (ret == -2) {
459 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
460 return;
463 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
464 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
465 ctdb->pnn, ctdb_lmaster(ctdb, &key),
466 hdr->generation, ctdb->vnn_map->generation));
467 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
470 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
471 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
473 /* its a protocol error if the sending node is not the current dmaster */
474 if (header.dmaster != hdr->srcnode) {
475 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
476 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
477 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
478 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
479 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
480 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
481 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
483 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
484 ctdb_ltdb_unlock(ctdb_db, key);
485 return;
489 if (header.rsn > c->rsn) {
490 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
491 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
492 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
493 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
496 /* use the rsn from the sending node */
497 header.rsn = c->rsn;
499 /* store the record flags from the sending node */
500 header.flags = record_flags;
502 /* check if the new dmaster is the lmaster, in which case we
503 skip the dmaster reply */
504 if (c->dmaster == ctdb->pnn) {
505 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
506 } else {
507 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
509 ret = ctdb_ltdb_unlock(ctdb_db, key);
510 if (ret != 0) {
511 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
516 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
517 struct timeval t, void *private_data)
519 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
520 struct ctdb_sticky_record);
521 talloc_free(sr);
524 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
526 if (data) {
527 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
528 talloc_free(data);
530 return parm;
533 static int
534 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
536 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
537 uint32_t *k;
538 struct ctdb_sticky_record *sr;
540 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
541 if (k == NULL) {
542 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
543 talloc_free(tmp_ctx);
544 return -1;
547 k[0] = (key.dsize + 3) / 4 + 1;
548 memcpy(&k[1], key.dptr, key.dsize);
550 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
551 if (sr != NULL) {
552 talloc_free(tmp_ctx);
553 return 0;
556 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
557 if (sr == NULL) {
558 talloc_free(tmp_ctx);
559 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
560 return -1;
563 sr->ctdb = ctdb;
564 sr->ctdb_db = ctdb_db;
565 sr->pindown = NULL;
567 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
568 ctdb->tunable.sticky_duration,
569 ctdb_db->db_name, ctdb_hash(&key)));
571 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
573 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
575 talloc_free(tmp_ctx);
576 return 0;
579 struct pinned_down_requeue_handle {
580 struct ctdb_context *ctdb;
581 struct ctdb_req_header *hdr;
584 struct pinned_down_deferred_call {
585 struct ctdb_context *ctdb;
586 struct ctdb_req_header *hdr;
589 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
590 struct timeval t, void *private_data)
592 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
593 struct ctdb_context *ctdb = handle->ctdb;
595 talloc_steal(ctdb, handle->hdr);
596 ctdb_call_input_pkt(ctdb, handle->hdr);
598 talloc_free(handle);
601 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
603 struct ctdb_context *ctdb = pinned_down->ctdb;
604 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
606 handle->ctdb = pinned_down->ctdb;
607 handle->hdr = pinned_down->hdr;
608 talloc_steal(handle, handle->hdr);
610 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
612 return 0;
615 static int
616 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
618 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
619 uint32_t *k;
620 struct ctdb_sticky_record *sr;
621 struct pinned_down_deferred_call *pinned_down;
623 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
624 if (k == NULL) {
625 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
626 talloc_free(tmp_ctx);
627 return -1;
630 k[0] = (key.dsize + 3) / 4 + 1;
631 memcpy(&k[1], key.dptr, key.dsize);
633 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
634 if (sr == NULL) {
635 talloc_free(tmp_ctx);
636 return -1;
639 talloc_free(tmp_ctx);
641 if (sr->pindown == NULL) {
642 return -1;
645 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
646 if (pinned_down == NULL) {
647 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
648 return -1;
651 pinned_down->ctdb = ctdb;
652 pinned_down->hdr = hdr;
654 talloc_set_destructor(pinned_down, pinned_down_destructor);
655 talloc_steal(pinned_down, hdr);
657 return 0;
660 static void
661 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
663 int i, id;
665 /* smallest value is always at index 0 */
666 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
667 return;
670 /* see if we already know this key */
671 for (i = 0; i < MAX_HOT_KEYS; i++) {
672 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
673 continue;
675 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
676 continue;
678 /* found an entry for this key */
679 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
680 return;
682 ctdb_db->statistics.hot_keys[i].count = hopcount;
683 goto sort_keys;
686 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
687 id = ctdb_db->statistics.num_hot_keys;
688 ctdb_db->statistics.num_hot_keys++;
689 } else {
690 id = 0;
693 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
694 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
696 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
697 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
698 ctdb_db->statistics.hot_keys[id].count = hopcount;
699 DEBUG(DEBUG_NOTICE,("Updated hot key database=%s key=0x%08x id=%d hop_count=%d\n",
700 ctdb_db->db_name, ctdb_hash(&key), id, hopcount));
702 sort_keys:
703 for (i = 1; i < MAX_HOT_KEYS; i++) {
704 if (ctdb_db->statistics.hot_keys[i].count == 0) {
705 continue;
707 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
708 hopcount = ctdb_db->statistics.hot_keys[i].count;
709 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
710 ctdb_db->statistics.hot_keys[0].count = hopcount;
712 key = ctdb_db->statistics.hot_keys[i].key;
713 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
714 ctdb_db->statistics.hot_keys[0].key = key;
720 called when a CTDB_REQ_CALL packet comes in
722 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
724 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
725 TDB_DATA data;
726 struct ctdb_reply_call *r;
727 int ret, len;
728 struct ctdb_ltdb_header header;
729 struct ctdb_call *call;
730 struct ctdb_db_context *ctdb_db;
731 int tmp_count, bucket;
733 if (ctdb->methods == NULL) {
734 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
735 return;
739 ctdb_db = find_ctdb_db(ctdb, c->db_id);
740 if (!ctdb_db) {
741 ctdb_send_error(ctdb, hdr, -1,
742 "Unknown database in request. db_id==0x%08x",
743 c->db_id);
744 return;
747 call = talloc(hdr, struct ctdb_call);
748 CTDB_NO_MEMORY_FATAL(ctdb, call);
750 call->call_id = c->callid;
751 call->key.dptr = c->data;
752 call->key.dsize = c->keylen;
753 call->call_data.dptr = c->data + c->keylen;
754 call->call_data.dsize = c->calldatalen;
755 call->reply_data.dptr = NULL;
756 call->reply_data.dsize = 0;
759 /* If this record is pinned down we should defer the
760 request until the pindown times out
762 if (ctdb_db->sticky) {
763 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
764 DEBUG(DEBUG_WARNING,
765 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
766 talloc_free(call);
767 return;
772 /* determine if we are the dmaster for this key. This also
773 fetches the record data (if any), thus avoiding a 2nd fetch of the data
774 if the call will be answered locally */
776 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
777 ctdb_call_input_pkt, ctdb, false);
778 if (ret == -1) {
779 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
780 talloc_free(call);
781 return;
783 if (ret == -2) {
784 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
785 talloc_free(call);
786 return;
789 /* Dont do READONLY if we dont have a tracking database */
790 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
791 c->flags &= ~CTDB_WANT_READONLY;
794 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
795 header.flags &= ~CTDB_REC_RO_FLAGS;
796 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
797 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
798 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
799 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
801 /* and clear out the tracking data */
802 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
803 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
807 /* if we are revoking, we must defer all other calls until the revoke
808 * had completed.
810 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
811 talloc_free(data.dptr);
812 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
814 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
815 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
817 talloc_free(call);
818 return;
822 * If we are not the dmaster and are not hosting any delegations,
823 * then we redirect the request to the node than can answer it
824 * (the lmaster or the dmaster).
826 if ((header.dmaster != ctdb->pnn)
827 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
828 talloc_free(data.dptr);
829 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
831 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
832 if (ret != 0) {
833 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
835 talloc_free(call);
836 return;
839 if ( (!(c->flags & CTDB_WANT_READONLY))
840 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
841 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
842 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
843 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
845 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
847 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
848 ctdb_fatal(ctdb, "Failed to start record revoke");
850 talloc_free(data.dptr);
852 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
853 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
855 talloc_free(call);
857 return;
860 /* If this is the first request for delegation. bump rsn and set
861 * the delegations flag
863 if ((c->flags & CTDB_WANT_READONLY)
864 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
865 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
866 header.rsn += 3;
867 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
868 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
869 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
872 if ((c->flags & CTDB_WANT_READONLY)
873 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
874 TDB_DATA tdata;
876 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
877 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
878 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
880 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
881 ctdb_fatal(ctdb, "Failed to store trackingdb data");
883 free(tdata.dptr);
885 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
886 if (ret != 0) {
887 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
890 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
891 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
892 struct ctdb_reply_call);
893 CTDB_NO_MEMORY_FATAL(ctdb, r);
894 r->hdr.destnode = c->hdr.srcnode;
895 r->hdr.reqid = c->hdr.reqid;
896 r->status = 0;
897 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
898 header.rsn -= 2;
899 header.flags |= CTDB_REC_RO_HAVE_READONLY;
900 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
901 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
903 if (data.dsize) {
904 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
907 ctdb_queue_packet(ctdb, &r->hdr);
908 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
909 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
911 talloc_free(r);
912 talloc_free(call);
913 return;
916 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
917 tmp_count = c->hopcount;
918 bucket = 0;
919 while (tmp_count) {
920 tmp_count >>= 2;
921 bucket++;
923 if (bucket >= MAX_COUNT_BUCKETS) {
924 bucket = MAX_COUNT_BUCKETS - 1;
926 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
927 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
928 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
930 /* If this database supports sticky records, then check if the
931 hopcount is big. If it is it means the record is hot and we
932 should make it sticky.
934 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
935 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
939 /* Try if possible to migrate the record off to the caller node.
940 * From the clients perspective a fetch of the data is just as
941 * expensive as a migration.
943 if (c->hdr.srcnode != ctdb->pnn) {
944 if (ctdb_db->persistent_state) {
945 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
946 " of key %s while transaction is active\n",
947 (char *)call->key.dptr));
948 } else {
949 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
950 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
951 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
952 talloc_free(data.dptr);
954 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
955 if (ret != 0) {
956 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
959 talloc_free(call);
960 return;
963 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
964 if (ret != 0) {
965 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
966 call->status = -1;
969 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
970 if (ret != 0) {
971 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
974 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
975 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
976 struct ctdb_reply_call);
977 CTDB_NO_MEMORY_FATAL(ctdb, r);
978 r->hdr.destnode = hdr->srcnode;
979 r->hdr.reqid = hdr->reqid;
980 r->status = call->status;
981 r->datalen = call->reply_data.dsize;
982 if (call->reply_data.dsize) {
983 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
986 ctdb_queue_packet(ctdb, &r->hdr);
988 talloc_free(r);
989 talloc_free(call);
993 * called when a CTDB_REPLY_CALL packet comes in
995 * This packet comes in response to a CTDB_REQ_CALL request packet. It
996 * contains any reply data from the call
998 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1000 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
1001 struct ctdb_call_state *state;
1003 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1004 if (state == NULL) {
1005 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1006 return;
1009 if (hdr->reqid != state->reqid) {
1010 /* we found a record but it was the wrong one */
1011 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1012 return;
1016 /* read only delegation processing */
1017 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1018 * delegation since we may need to update the record header
1020 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1021 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1022 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1023 struct ctdb_ltdb_header oldheader;
1024 TDB_DATA key, data, olddata;
1025 int ret;
1027 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1028 goto finished_ro;
1029 return;
1032 key.dsize = state->c->keylen;
1033 key.dptr = state->c->data;
1034 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1035 ctdb_call_input_pkt, ctdb, false);
1036 if (ret == -2) {
1037 return;
1039 if (ret != 0) {
1040 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1041 return;
1044 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1045 if (ret != 0) {
1046 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1047 ctdb_ltdb_unlock(ctdb_db, key);
1048 goto finished_ro;
1051 if (header->rsn <= oldheader.rsn) {
1052 ctdb_ltdb_unlock(ctdb_db, key);
1053 goto finished_ro;
1056 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1057 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1058 ctdb_ltdb_unlock(ctdb_db, key);
1059 goto finished_ro;
1062 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1063 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1064 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1065 if (ret != 0) {
1066 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1067 ctdb_ltdb_unlock(ctdb_db, key);
1068 goto finished_ro;
1071 ctdb_ltdb_unlock(ctdb_db, key);
1073 finished_ro:
1075 state->call->reply_data.dptr = c->data;
1076 state->call->reply_data.dsize = c->datalen;
1077 state->call->status = c->status;
1079 talloc_steal(state, c);
1081 state->state = CTDB_CALL_DONE;
1082 if (state->async.fn) {
1083 state->async.fn(state);
1089 * called when a CTDB_REPLY_DMASTER packet comes in
1091 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1092 * request packet. It means that the current dmaster wants to give us
1093 * the dmaster role.
1095 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1097 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1098 struct ctdb_db_context *ctdb_db;
1099 TDB_DATA key, data;
1100 uint32_t record_flags = 0;
1101 size_t len;
1102 int ret;
1104 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1105 if (ctdb_db == NULL) {
1106 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1107 return;
1110 key.dptr = c->data;
1111 key.dsize = c->keylen;
1112 data.dptr = &c->data[key.dsize];
1113 data.dsize = c->datalen;
1114 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1115 + sizeof(uint32_t);
1116 if (len <= c->hdr.length) {
1117 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
1120 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1121 ctdb_call_input_pkt, ctdb, false);
1122 if (ret == -2) {
1123 return;
1125 if (ret != 0) {
1126 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1127 return;
1130 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1135 called when a CTDB_REPLY_ERROR packet comes in
1137 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1139 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1140 struct ctdb_call_state *state;
1142 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1143 if (state == NULL) {
1144 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1145 ctdb->pnn, hdr->reqid));
1146 return;
1149 if (hdr->reqid != state->reqid) {
1150 /* we found a record but it was the wrong one */
1151 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1152 return;
1155 talloc_steal(state, c);
1157 state->state = CTDB_CALL_ERROR;
1158 state->errmsg = (char *)c->msg;
1159 if (state->async.fn) {
1160 state->async.fn(state);
1166 destroy a ctdb_call
1168 static int ctdb_call_destructor(struct ctdb_call_state *state)
1170 DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1171 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1172 return 0;
1177 called when a ctdb_call needs to be resent after a reconfigure event
1179 static void ctdb_call_resend(struct ctdb_call_state *state)
1181 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1183 state->generation = ctdb->vnn_map->generation;
1185 /* use a new reqid, in case the old reply does eventually come in */
1186 ctdb_reqid_remove(ctdb, state->reqid);
1187 state->reqid = ctdb_reqid_new(ctdb, state);
1188 state->c->hdr.reqid = state->reqid;
1190 /* update the generation count for this request, so its valid with the new vnn_map */
1191 state->c->hdr.generation = state->generation;
1193 /* send the packet to ourselves, it will be redirected appropriately */
1194 state->c->hdr.destnode = ctdb->pnn;
1196 ctdb_queue_packet(ctdb, &state->c->hdr);
1197 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1201 resend all pending calls on recovery
1203 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1205 struct ctdb_call_state *state, *next;
1206 for (state=ctdb->pending_calls;state;state=next) {
1207 next = state->next;
1208 ctdb_call_resend(state);
1213 this allows the caller to setup a async.fn
1215 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1216 struct timeval t, void *private_data)
1218 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1219 if (state->async.fn) {
1220 state->async.fn(state);
1226 construct an event driven local ctdb_call
1228 this is used so that locally processed ctdb_call requests are processed
1229 in an event driven manner
1231 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1232 struct ctdb_call *call,
1233 struct ctdb_ltdb_header *header,
1234 TDB_DATA *data)
1236 struct ctdb_call_state *state;
1237 struct ctdb_context *ctdb = ctdb_db->ctdb;
1238 int ret;
1240 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1241 CTDB_NO_MEMORY_NULL(ctdb, state);
1243 talloc_steal(state, data->dptr);
1245 state->state = CTDB_CALL_DONE;
1246 state->call = talloc(state, struct ctdb_call);
1247 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1248 *(state->call) = *call;
1249 state->ctdb_db = ctdb_db;
1251 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1252 if (ret != 0) {
1253 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1256 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1258 return state;
1263 make a remote ctdb call - async send. Called in daemon context.
1265 This constructs a ctdb_call request and queues it for processing.
1266 This call never blocks.
1268 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1269 struct ctdb_call *call,
1270 struct ctdb_ltdb_header *header)
1272 uint32_t len;
1273 struct ctdb_call_state *state;
1274 struct ctdb_context *ctdb = ctdb_db->ctdb;
1276 if (ctdb->methods == NULL) {
1277 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1278 return NULL;
1281 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1282 CTDB_NO_MEMORY_NULL(ctdb, state);
1283 state->call = talloc(state, struct ctdb_call);
1284 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1286 state->reqid = ctdb_reqid_new(ctdb, state);
1287 state->ctdb_db = ctdb_db;
1288 talloc_set_destructor(state, ctdb_call_destructor);
1290 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1291 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1292 struct ctdb_req_call);
1293 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1294 state->c->hdr.destnode = header->dmaster;
1296 /* this limits us to 16k outstanding messages - not unreasonable */
1297 state->c->hdr.reqid = state->reqid;
1298 state->c->flags = call->flags;
1299 state->c->db_id = ctdb_db->db_id;
1300 state->c->callid = call->call_id;
1301 state->c->hopcount = 0;
1302 state->c->keylen = call->key.dsize;
1303 state->c->calldatalen = call->call_data.dsize;
1304 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1305 memcpy(&state->c->data[call->key.dsize],
1306 call->call_data.dptr, call->call_data.dsize);
1307 *(state->call) = *call;
1308 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1309 state->call->key.dptr = &state->c->data[0];
1311 state->state = CTDB_CALL_WAIT;
1312 state->generation = ctdb->vnn_map->generation;
1314 DLIST_ADD(ctdb->pending_calls, state);
1316 ctdb_queue_packet(ctdb, &state->c->hdr);
1318 return state;
1322 make a remote ctdb call - async recv - called in daemon context
1324 This is called when the program wants to wait for a ctdb_call to complete and get the
1325 results. This call will block unless the call has already completed.
1327 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1329 while (state->state < CTDB_CALL_DONE) {
1330 event_loop_once(state->ctdb_db->ctdb->ev);
1332 if (state->state != CTDB_CALL_DONE) {
1333 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1334 talloc_free(state);
1335 return -1;
1338 if (state->call->reply_data.dsize) {
1339 call->reply_data.dptr = talloc_memdup(call,
1340 state->call->reply_data.dptr,
1341 state->call->reply_data.dsize);
1342 call->reply_data.dsize = state->call->reply_data.dsize;
1343 } else {
1344 call->reply_data.dptr = NULL;
1345 call->reply_data.dsize = 0;
1347 call->status = state->call->status;
1348 talloc_free(state);
1349 return 0;
1354 send a keepalive packet to the other node
1356 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1358 struct ctdb_req_keepalive *r;
1360 if (ctdb->methods == NULL) {
1361 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1362 return;
1365 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1366 sizeof(struct ctdb_req_keepalive),
1367 struct ctdb_req_keepalive);
1368 CTDB_NO_MEMORY_FATAL(ctdb, r);
1369 r->hdr.destnode = destnode;
1370 r->hdr.reqid = 0;
1372 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1374 ctdb_queue_packet(ctdb, &r->hdr);
1376 talloc_free(r);
1381 struct revokechild_deferred_call {
1382 struct ctdb_context *ctdb;
1383 struct ctdb_req_header *hdr;
1384 deferred_requeue_fn fn;
1385 void *ctx;
1388 struct revokechild_handle {
1389 struct revokechild_handle *next, *prev;
1390 struct ctdb_context *ctdb;
1391 struct ctdb_db_context *ctdb_db;
1392 struct fd_event *fde;
1393 int status;
1394 int fd[2];
1395 pid_t child;
1396 TDB_DATA key;
1399 struct revokechild_requeue_handle {
1400 struct ctdb_context *ctdb;
1401 struct ctdb_req_header *hdr;
1402 deferred_requeue_fn fn;
1403 void *ctx;
1406 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1407 struct timeval t, void *private_data)
1409 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1411 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1412 talloc_free(requeue_handle);
1415 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1417 struct ctdb_context *ctdb = deferred_call->ctdb;
1418 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1419 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1421 requeue_handle->ctdb = ctdb;
1422 requeue_handle->hdr = deferred_call->hdr;
1423 requeue_handle->fn = deferred_call->fn;
1424 requeue_handle->ctx = deferred_call->ctx;
1425 talloc_steal(requeue_handle, requeue_handle->hdr);
1427 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1428 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1430 return 0;
1434 static int revokechild_destructor(struct revokechild_handle *rc)
1436 if (rc->fde != NULL) {
1437 talloc_free(rc->fde);
1440 if (rc->fd[0] != -1) {
1441 close(rc->fd[0]);
1443 if (rc->fd[1] != -1) {
1444 close(rc->fd[1]);
1446 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1448 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1449 return 0;
1452 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1453 uint16_t flags, void *private_data)
1455 struct revokechild_handle *rc = talloc_get_type(private_data,
1456 struct revokechild_handle);
1457 int ret;
1458 char c;
1460 ret = read(rc->fd[0], &c, 1);
1461 if (ret != 1) {
1462 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1463 rc->status = -1;
1464 talloc_free(rc);
1465 return;
1467 if (c != 0) {
1468 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1469 rc->status = -1;
1470 talloc_free(rc);
1471 return;
1474 talloc_free(rc);
1477 struct ctdb_revoke_state {
1478 struct ctdb_db_context *ctdb_db;
1479 TDB_DATA key;
1480 struct ctdb_ltdb_header *header;
1481 TDB_DATA data;
1482 int count;
1483 int status;
1484 int finished;
1487 static void update_record_cb(struct ctdb_client_control_state *state)
1489 struct ctdb_revoke_state *revoke_state;
1490 int ret;
1491 int32_t res;
1493 if (state == NULL) {
1494 return;
1496 revoke_state = state->async.private_data;
1498 state->async.fn = NULL;
1499 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1500 if ((ret != 0) || (res != 0)) {
1501 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1502 revoke_state->status = -1;
1505 revoke_state->count--;
1506 if (revoke_state->count <= 0) {
1507 revoke_state->finished = 1;
1511 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1513 struct ctdb_revoke_state *revoke_state = private_data;
1514 struct ctdb_client_control_state *state;
1516 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1517 if (state == NULL) {
1518 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1519 revoke_state->status = -1;
1520 return;
1522 state->async.fn = update_record_cb;
1523 state->async.private_data = revoke_state;
1525 revoke_state->count++;
1529 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1530 struct timeval yt, void *private_data)
1532 struct ctdb_revoke_state *state = private_data;
1534 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1535 state->finished = 1;
1536 state->status = -1;
1539 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1541 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1542 int status;
1544 state->ctdb_db = ctdb_db;
1545 state->key = key;
1546 state->header = header;
1547 state->data = data;
1549 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1551 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0), ctdb_revoke_timeout_handler, state);
1553 while (state->finished == 0) {
1554 event_loop_once(ctdb->ev);
1557 status = state->status;
1559 if (status == 0) {
1560 struct ctdb_ltdb_header new_header;
1561 TDB_DATA new_data;
1563 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1564 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1565 talloc_free(state);
1566 return -1;
1568 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1569 ctdb_ltdb_unlock(ctdb_db, key);
1570 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1571 talloc_free(state);
1572 return -1;
1574 header->rsn++;
1575 if (new_header.rsn > header->rsn) {
1576 ctdb_ltdb_unlock(ctdb_db, key);
1577 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1578 talloc_free(state);
1579 return -1;
1581 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1582 ctdb_ltdb_unlock(ctdb_db, key);
1583 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1584 talloc_free(state);
1585 return -1;
1587 new_header.rsn++;
1588 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1589 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1590 ctdb_ltdb_unlock(ctdb_db, key);
1591 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1592 talloc_free(state);
1593 return -1;
1595 ctdb_ltdb_unlock(ctdb_db, key);
1598 talloc_free(state);
1599 return status;
1603 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1605 TDB_DATA tdata;
1606 struct revokechild_handle *rc;
1607 pid_t parent = getpid();
1608 int ret;
1610 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1611 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1612 header->rsn -= 1;
1614 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1615 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1616 return -1;
1619 tdata = tdb_fetch(ctdb_db->rottdb, key);
1620 if (tdata.dsize > 0) {
1621 uint8_t *tmp;
1623 tmp = tdata.dptr;
1624 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1625 free(tmp);
1628 rc->status = 0;
1629 rc->ctdb = ctdb;
1630 rc->ctdb_db = ctdb_db;
1631 rc->fd[0] = -1;
1632 rc->fd[1] = -1;
1634 talloc_set_destructor(rc, revokechild_destructor);
1636 rc->key.dsize = key.dsize;
1637 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1638 if (rc->key.dptr == NULL) {
1639 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1640 talloc_free(rc);
1641 return -1;
1644 ret = pipe(rc->fd);
1645 if (ret != 0) {
1646 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1647 talloc_free(rc);
1648 return -1;
1652 rc->child = ctdb_fork(ctdb);
1653 if (rc->child == (pid_t)-1) {
1654 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1655 talloc_free(rc);
1656 return -1;
1659 if (rc->child == 0) {
1660 char c = 0;
1661 close(rc->fd[0]);
1662 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1664 ctdb_set_process_name("ctdb_revokechild");
1665 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1666 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1667 c = 1;
1668 goto child_finished;
1671 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1673 child_finished:
1674 write(rc->fd[1], &c, 1);
1675 /* make sure we die when our parent dies */
1676 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1677 sleep(5);
1679 _exit(0);
1682 close(rc->fd[1]);
1683 rc->fd[1] = -1;
1684 set_close_on_exec(rc->fd[0]);
1686 /* This is an active revokechild child process */
1687 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1689 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1690 EVENT_FD_READ, revokechild_handler,
1691 (void *)rc);
1692 if (rc->fde == NULL) {
1693 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1694 talloc_free(rc);
1696 tevent_fd_set_auto_close(rc->fde);
1698 return 0;
1701 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1703 struct revokechild_handle *rc;
1704 struct revokechild_deferred_call *deferred_call;
1706 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1707 if (rc->key.dsize == 0) {
1708 continue;
1710 if (rc->key.dsize != key.dsize) {
1711 continue;
1713 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1714 break;
1718 if (rc == NULL) {
1719 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1720 return -1;
1723 deferred_call = talloc(rc, struct revokechild_deferred_call);
1724 if (deferred_call == NULL) {
1725 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1726 return -1;
1729 deferred_call->ctdb = ctdb;
1730 deferred_call->hdr = hdr;
1731 deferred_call->fn = fn;
1732 deferred_call->ctx = call_context;
1734 talloc_set_destructor(deferred_call, deferred_call_destructor);
1735 talloc_steal(deferred_call, hdr);
1737 return 0;