ctdbd: remove a nonempty blank line
[Samba.git] / ctdb / server / ctdb_call.c
blobfeb3e61ee766ab58760f32ca70b90ef4042c11cf
1 /*
2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record {
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
34 TDB_CONTEXT *pindown;
38 find the ctdb_db from a db index
40 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 struct ctdb_db_context *ctdb_db;
44 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45 if (ctdb_db->db_id == id) {
46 break;
49 return ctdb_db;
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58 ctdb_input_pkt(ctdb, hdr);
63 send an error reply
65 static void ctdb_send_error(struct ctdb_context *ctdb,
66 struct ctdb_req_header *hdr, uint32_t status,
67 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb,
69 struct ctdb_req_header *hdr, uint32_t status,
70 const char *fmt, ...)
72 va_list ap;
73 struct ctdb_reply_error *r;
74 char *msg;
75 int msglen, len;
77 if (ctdb->methods == NULL) {
78 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
79 return;
82 va_start(ap, fmt);
83 msg = talloc_vasprintf(ctdb, fmt, ap);
84 if (msg == NULL) {
85 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
87 va_end(ap);
89 msglen = strlen(msg)+1;
90 len = offsetof(struct ctdb_reply_error, msg);
91 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
92 struct ctdb_reply_error);
93 CTDB_NO_MEMORY_FATAL(ctdb, r);
95 r->hdr.destnode = hdr->srcnode;
96 r->hdr.reqid = hdr->reqid;
97 r->status = status;
98 r->msglen = msglen;
99 memcpy(&r->msg[0], msg, msglen);
101 ctdb_queue_packet(ctdb, &r->hdr);
103 talloc_free(msg);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. Note that this works because of this: When
117 * a record is migrated off a node, then the new DMASTER is stored
118 * in the record's copy on the former DMASTER.
120 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
121 struct ctdb_db_context *ctdb_db,
122 TDB_DATA key,
123 struct ctdb_req_call *c,
124 struct ctdb_ltdb_header *header)
126 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
128 c->hdr.destnode = lmaster;
129 if (ctdb->pnn == lmaster) {
130 c->hdr.destnode = header->dmaster;
132 c->hopcount++;
134 if (c->hopcount%100 == 99) {
135 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:0x%08x "
136 "key:0x%08x pnn:%d src:%d lmaster:%d "
137 "header->dmaster:%d dst:%d\n",
138 c->hopcount, ctdb_db->db_id, ctdb_hash(&key),
139 ctdb->pnn, c->hdr.srcnode, lmaster,
140 header->dmaster, c->hdr.destnode));
143 ctdb_queue_packet(ctdb, &c->hdr);
148 send a dmaster reply
150 caller must have the chainlock before calling this routine. Caller must be
151 the lmaster
153 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
154 struct ctdb_ltdb_header *header,
155 TDB_DATA key, TDB_DATA data,
156 uint32_t new_dmaster,
157 uint32_t reqid)
159 struct ctdb_context *ctdb = ctdb_db->ctdb;
160 struct ctdb_reply_dmaster *r;
161 int ret, len;
162 TALLOC_CTX *tmp_ctx;
164 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
165 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
166 return;
169 header->dmaster = new_dmaster;
170 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
171 if (ret != 0) {
172 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
173 return;
176 if (ctdb->methods == NULL) {
177 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
178 return;
181 /* put the packet on a temporary context, allowing us to safely free
182 it below even if ctdb_reply_dmaster() has freed it already */
183 tmp_ctx = talloc_new(ctdb);
185 /* send the CTDB_REPLY_DMASTER */
186 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
187 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
188 struct ctdb_reply_dmaster);
189 CTDB_NO_MEMORY_FATAL(ctdb, r);
191 r->hdr.destnode = new_dmaster;
192 r->hdr.reqid = reqid;
193 r->rsn = header->rsn;
194 r->keylen = key.dsize;
195 r->datalen = data.dsize;
196 r->db_id = ctdb_db->db_id;
197 memcpy(&r->data[0], key.dptr, key.dsize);
198 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
199 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
201 ctdb_queue_packet(ctdb, &r->hdr);
203 talloc_free(tmp_ctx);
207 send a dmaster request (give another node the dmaster for a record)
209 This is always sent to the lmaster, which ensures that the lmaster
210 always knows who the dmaster is. The lmaster will then send a
211 CTDB_REPLY_DMASTER to the new dmaster
213 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
214 struct ctdb_req_call *c,
215 struct ctdb_ltdb_header *header,
216 TDB_DATA *key, TDB_DATA *data)
218 struct ctdb_req_dmaster *r;
219 struct ctdb_context *ctdb = ctdb_db->ctdb;
220 int len;
221 uint32_t lmaster = ctdb_lmaster(ctdb, key);
223 if (ctdb->methods == NULL) {
224 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
225 return;
228 if (data->dsize != 0) {
229 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
232 if (lmaster == ctdb->pnn) {
233 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
234 c->hdr.srcnode, c->hdr.reqid);
235 return;
238 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
239 + sizeof(uint32_t);
240 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
241 struct ctdb_req_dmaster);
242 CTDB_NO_MEMORY_FATAL(ctdb, r);
243 r->hdr.destnode = lmaster;
244 r->hdr.reqid = c->hdr.reqid;
245 r->db_id = c->db_id;
246 r->rsn = header->rsn;
247 r->dmaster = c->hdr.srcnode;
248 r->keylen = key->dsize;
249 r->datalen = data->dsize;
250 memcpy(&r->data[0], key->dptr, key->dsize);
251 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
252 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
254 header->dmaster = c->hdr.srcnode;
255 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
256 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
259 ctdb_queue_packet(ctdb, &r->hdr);
261 talloc_free(r);
264 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
265 struct timeval t, void *private_data)
267 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
268 struct ctdb_sticky_record);
270 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
271 if (sr->pindown != NULL) {
272 talloc_free(sr->pindown);
273 sr->pindown = NULL;
277 static int
278 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
280 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
281 uint32_t *k;
282 struct ctdb_sticky_record *sr;
284 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
285 if (k == NULL) {
286 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
287 talloc_free(tmp_ctx);
288 return -1;
291 k[0] = (key.dsize + 3) / 4 + 1;
292 memcpy(&k[1], key.dptr, key.dsize);
294 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
295 if (sr == NULL) {
296 talloc_free(tmp_ctx);
297 return 0;
300 talloc_free(tmp_ctx);
302 if (sr->pindown == NULL) {
303 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
304 sr->pindown = talloc_new(sr);
305 if (sr->pindown == NULL) {
306 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
307 return -1;
309 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
312 return 0;
316 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
317 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
319 must be called with the chainlock held. This function releases the chainlock
321 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
322 struct ctdb_req_header *hdr,
323 TDB_DATA key, TDB_DATA data,
324 uint64_t rsn, uint32_t record_flags)
326 struct ctdb_call_state *state;
327 struct ctdb_context *ctdb = ctdb_db->ctdb;
328 struct ctdb_ltdb_header header;
329 int ret;
331 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
333 ZERO_STRUCT(header);
334 header.rsn = rsn;
335 header.dmaster = ctdb->pnn;
336 header.flags = record_flags;
338 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
340 if (state) {
341 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
343 * We temporarily add the VACUUM_MIGRATED flag to
344 * the record flags, so that ctdb_ltdb_store can
345 * decide whether the record should be stored or
346 * deleted.
348 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
352 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
353 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
355 ret = ctdb_ltdb_unlock(ctdb_db, key);
356 if (ret != 0) {
357 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
359 return;
362 /* we just became DMASTER and this database is "sticky",
363 see if the record is flagged as "hot" and set up a pin-down
364 context to stop migrations for a little while if so
366 if (ctdb_db->sticky) {
367 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
370 if (state == NULL) {
371 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
372 ctdb->pnn, hdr->reqid, hdr->srcnode));
374 ret = ctdb_ltdb_unlock(ctdb_db, key);
375 if (ret != 0) {
376 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
378 return;
381 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
382 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
384 ret = ctdb_ltdb_unlock(ctdb_db, key);
385 if (ret != 0) {
386 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
388 return;
391 if (hdr->reqid != state->reqid) {
392 /* we found a record but it was the wrong one */
393 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
395 ret = ctdb_ltdb_unlock(ctdb_db, key);
396 if (ret != 0) {
397 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
399 return;
402 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn);
404 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
405 if (ret != 0) {
406 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 state->state = CTDB_CALL_DONE;
410 if (state->async.fn) {
411 state->async.fn(state);
418 called when a CTDB_REQ_DMASTER packet comes in
420 this comes into the lmaster for a record when the current dmaster
421 wants to give up the dmaster role and give it to someone else
423 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
425 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
426 TDB_DATA key, data, data2;
427 struct ctdb_ltdb_header header;
428 struct ctdb_db_context *ctdb_db;
429 uint32_t record_flags = 0;
430 size_t len;
431 int ret;
433 key.dptr = c->data;
434 key.dsize = c->keylen;
435 data.dptr = c->data + c->keylen;
436 data.dsize = c->datalen;
437 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
438 + sizeof(uint32_t);
439 if (len <= c->hdr.length) {
440 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
443 ctdb_db = find_ctdb_db(ctdb, c->db_id);
444 if (!ctdb_db) {
445 ctdb_send_error(ctdb, hdr, -1,
446 "Unknown database in request. db_id==0x%08x",
447 c->db_id);
448 return;
451 /* fetch the current record */
452 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
453 ctdb_call_input_pkt, ctdb, false);
454 if (ret == -1) {
455 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
456 return;
458 if (ret == -2) {
459 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
460 return;
463 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
464 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
465 ctdb->pnn, ctdb_lmaster(ctdb, &key),
466 hdr->generation, ctdb->vnn_map->generation));
467 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
470 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
471 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
473 /* its a protocol error if the sending node is not the current dmaster */
474 if (header.dmaster != hdr->srcnode) {
475 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
476 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
477 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
478 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
479 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
480 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
481 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
483 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
484 ctdb_ltdb_unlock(ctdb_db, key);
485 return;
489 if (header.rsn > c->rsn) {
490 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
491 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
492 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
493 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
496 /* use the rsn from the sending node */
497 header.rsn = c->rsn;
499 /* store the record flags from the sending node */
500 header.flags = record_flags;
502 /* check if the new dmaster is the lmaster, in which case we
503 skip the dmaster reply */
504 if (c->dmaster == ctdb->pnn) {
505 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
506 } else {
507 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
509 ret = ctdb_ltdb_unlock(ctdb_db, key);
510 if (ret != 0) {
511 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
516 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
517 struct timeval t, void *private_data)
519 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
520 struct ctdb_sticky_record);
521 talloc_free(sr);
524 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
526 if (data) {
527 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
528 talloc_free(data);
530 return parm;
533 static int
534 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
536 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
537 uint32_t *k;
538 struct ctdb_sticky_record *sr;
540 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
541 if (k == NULL) {
542 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
543 talloc_free(tmp_ctx);
544 return -1;
547 k[0] = (key.dsize + 3) / 4 + 1;
548 memcpy(&k[1], key.dptr, key.dsize);
550 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
551 if (sr != NULL) {
552 talloc_free(tmp_ctx);
553 return 0;
556 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
557 if (sr == NULL) {
558 talloc_free(tmp_ctx);
559 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
560 return -1;
563 sr->ctdb = ctdb;
564 sr->ctdb_db = ctdb_db;
565 sr->pindown = NULL;
567 DEBUG(DEBUG_ERR,("Make record sticky in db %s\n", ctdb_db->db_name));
569 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
571 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
573 talloc_free(tmp_ctx);
574 return 0;
577 struct pinned_down_requeue_handle {
578 struct ctdb_context *ctdb;
579 struct ctdb_req_header *hdr;
582 struct pinned_down_deferred_call {
583 struct ctdb_context *ctdb;
584 struct ctdb_req_header *hdr;
587 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
588 struct timeval t, void *private_data)
590 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
591 struct ctdb_context *ctdb = handle->ctdb;
593 talloc_steal(ctdb, handle->hdr);
594 ctdb_call_input_pkt(ctdb, handle->hdr);
596 talloc_free(handle);
599 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
601 struct ctdb_context *ctdb = pinned_down->ctdb;
602 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
604 handle->ctdb = pinned_down->ctdb;
605 handle->hdr = pinned_down->hdr;
606 talloc_steal(handle, handle->hdr);
608 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
610 return 0;
613 static int
614 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
616 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
617 uint32_t *k;
618 struct ctdb_sticky_record *sr;
619 struct pinned_down_deferred_call *pinned_down;
621 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
622 if (k == NULL) {
623 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
624 talloc_free(tmp_ctx);
625 return -1;
628 k[0] = (key.dsize + 3) / 4 + 1;
629 memcpy(&k[1], key.dptr, key.dsize);
631 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
632 if (sr == NULL) {
633 talloc_free(tmp_ctx);
634 return -1;
637 talloc_free(tmp_ctx);
639 if (sr->pindown == NULL) {
640 return -1;
643 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
644 if (pinned_down == NULL) {
645 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
646 return -1;
649 pinned_down->ctdb = ctdb;
650 pinned_down->hdr = hdr;
652 talloc_set_destructor(pinned_down, pinned_down_destructor);
653 talloc_steal(pinned_down, hdr);
655 return 0;
658 static void
659 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
661 int i;
663 /* smallest value is always at index 0 */
664 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
665 return;
668 /* see if we already know this key */
669 for (i = 0; i < MAX_HOT_KEYS; i++) {
670 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
671 continue;
673 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
674 continue;
676 /* found an entry for this key */
677 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
678 return;
680 ctdb_db->statistics.hot_keys[i].count = hopcount;
681 goto sort_keys;
684 if (ctdb_db->statistics.hot_keys[0].key.dptr != NULL) {
685 talloc_free(ctdb_db->statistics.hot_keys[0].key.dptr);
687 ctdb_db->statistics.hot_keys[0].key.dsize = key.dsize;
688 ctdb_db->statistics.hot_keys[0].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
689 ctdb_db->statistics.hot_keys[0].count = hopcount;
692 sort_keys:
693 for (i = 2; i < MAX_HOT_KEYS; i++) {
694 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
695 hopcount = ctdb_db->statistics.hot_keys[i].count;
696 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
697 ctdb_db->statistics.hot_keys[0].count = hopcount;
699 key = ctdb_db->statistics.hot_keys[i].key;
700 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
701 ctdb_db->statistics.hot_keys[0].key = key;
707 called when a CTDB_REQ_CALL packet comes in
709 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
711 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
712 TDB_DATA data;
713 struct ctdb_reply_call *r;
714 int ret, len;
715 struct ctdb_ltdb_header header;
716 struct ctdb_call *call;
717 struct ctdb_db_context *ctdb_db;
718 int tmp_count, bucket;
720 if (ctdb->methods == NULL) {
721 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
722 return;
726 ctdb_db = find_ctdb_db(ctdb, c->db_id);
727 if (!ctdb_db) {
728 ctdb_send_error(ctdb, hdr, -1,
729 "Unknown database in request. db_id==0x%08x",
730 c->db_id);
731 return;
734 call = talloc(hdr, struct ctdb_call);
735 CTDB_NO_MEMORY_FATAL(ctdb, call);
737 call->call_id = c->callid;
738 call->key.dptr = c->data;
739 call->key.dsize = c->keylen;
740 call->call_data.dptr = c->data + c->keylen;
741 call->call_data.dsize = c->calldatalen;
742 call->reply_data.dptr = NULL;
743 call->reply_data.dsize = 0;
746 /* If this record is pinned down we should defer the
747 request until the pindown times out
749 if (ctdb_db->sticky) {
750 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
751 DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
752 return;
757 /* determine if we are the dmaster for this key. This also
758 fetches the record data (if any), thus avoiding a 2nd fetch of the data
759 if the call will be answered locally */
761 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
762 ctdb_call_input_pkt, ctdb, false);
763 if (ret == -1) {
764 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
765 return;
767 if (ret == -2) {
768 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
769 return;
772 /* Dont do READONLY if we dont have a tracking database */
773 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
774 c->flags &= ~CTDB_WANT_READONLY;
777 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
778 header.flags &= ~CTDB_REC_RO_FLAGS;
779 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
780 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
781 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
782 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
784 /* and clear out the tracking data */
785 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
786 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
790 /* if we are revoking, we must defer all other calls until the revoke
791 * had completed.
793 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
794 talloc_free(data.dptr);
795 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
797 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
798 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
800 talloc_free(call);
801 return;
804 /* if we are not the dmaster and are not hosting any delegations,
805 then send a redirect to the requesting node */
806 if ((header.dmaster != ctdb->pnn)
807 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
808 talloc_free(data.dptr);
809 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
811 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
812 if (ret != 0) {
813 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
815 return;
818 if ( (!(c->flags & CTDB_WANT_READONLY))
819 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
820 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
821 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
822 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
824 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
826 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
827 ctdb_fatal(ctdb, "Failed to start record revoke");
829 talloc_free(data.dptr);
831 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
832 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
834 talloc_free(call);
836 return;
839 /* If this is the first request for delegation. bump rsn and set
840 * the delegations flag
842 if ((c->flags & CTDB_WANT_READONLY)
843 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
844 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
845 header.rsn += 3;
846 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
847 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
848 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
851 if ((c->flags & CTDB_WANT_READONLY)
852 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
853 TDB_DATA tdata;
855 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
856 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
857 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
859 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
860 ctdb_fatal(ctdb, "Failed to store trackingdb data");
862 free(tdata.dptr);
864 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
865 if (ret != 0) {
866 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
869 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
870 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
871 struct ctdb_reply_call);
872 CTDB_NO_MEMORY_FATAL(ctdb, r);
873 r->hdr.destnode = c->hdr.srcnode;
874 r->hdr.reqid = c->hdr.reqid;
875 r->status = 0;
876 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
877 header.rsn -= 2;
878 header.flags |= CTDB_REC_RO_HAVE_READONLY;
879 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
880 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
882 if (data.dsize) {
883 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
886 ctdb_queue_packet(ctdb, &r->hdr);
887 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
888 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
890 talloc_free(r);
891 return;
894 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
895 tmp_count = c->hopcount;
896 bucket = 0;
897 while (tmp_count) {
898 tmp_count >>= 2;
899 bucket++;
901 if (bucket >= MAX_COUNT_BUCKETS) {
902 bucket = MAX_COUNT_BUCKETS - 1;
904 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
905 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
906 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
908 /* If this database supports sticky records, then check if the
909 hopcount is big. If it is it means the record is hot and we
910 should make it sticky.
912 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
913 DEBUG(DEBUG_ERR, ("Hot record in database %s. Hopcount is %d. Make record sticky for %d seconds\n", ctdb_db->db_name, c->hopcount, ctdb->tunable.sticky_duration));
914 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
918 /* if this nodes has done enough consecutive calls on the same record
919 then give them the record
920 or if the node requested an immediate migration
922 if ( c->hdr.srcnode != ctdb->pnn &&
923 ((header.laccessor == c->hdr.srcnode
924 && header.lacount >= ctdb->tunable.max_lacount
925 && ctdb->tunable.max_lacount != 0)
926 || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
927 if (ctdb_db->transaction_active) {
928 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
929 " of key %s while transaction is active\n",
930 (char *)call->key.dptr));
931 } else {
932 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
933 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
934 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
935 talloc_free(data.dptr);
937 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
938 if (ret != 0) {
939 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
941 return;
945 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode);
946 if (ret != 0) {
947 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
948 call->status = -1;
951 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
952 if (ret != 0) {
953 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
956 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
957 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
958 struct ctdb_reply_call);
959 CTDB_NO_MEMORY_FATAL(ctdb, r);
960 r->hdr.destnode = hdr->srcnode;
961 r->hdr.reqid = hdr->reqid;
962 r->status = call->status;
963 r->datalen = call->reply_data.dsize;
964 if (call->reply_data.dsize) {
965 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
968 ctdb_queue_packet(ctdb, &r->hdr);
970 talloc_free(r);
974 called when a CTDB_REPLY_CALL packet comes in
976 This packet comes in response to a CTDB_REQ_CALL request packet. It
977 contains any reply data from the call
979 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
981 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
982 struct ctdb_call_state *state;
984 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
985 if (state == NULL) {
986 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
987 return;
990 if (hdr->reqid != state->reqid) {
991 /* we found a record but it was the wrong one */
992 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
993 return;
997 /* read only delegation processing */
998 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
999 * delegation since we may need to update the record header
1001 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1002 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1003 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1004 struct ctdb_ltdb_header oldheader;
1005 TDB_DATA key, data, olddata;
1006 int ret;
1008 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1009 goto finished_ro;
1010 return;
1013 key.dsize = state->c->keylen;
1014 key.dptr = state->c->data;
1015 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1016 ctdb_call_input_pkt, ctdb, false);
1017 if (ret == -2) {
1018 return;
1020 if (ret != 0) {
1021 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1022 return;
1025 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1026 if (ret != 0) {
1027 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1028 ctdb_ltdb_unlock(ctdb_db, key);
1029 goto finished_ro;
1032 if (header->rsn <= oldheader.rsn) {
1033 ctdb_ltdb_unlock(ctdb_db, key);
1034 goto finished_ro;
1037 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1038 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1039 ctdb_ltdb_unlock(ctdb_db, key);
1040 goto finished_ro;
1043 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1044 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1045 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1046 if (ret != 0) {
1047 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1048 ctdb_ltdb_unlock(ctdb_db, key);
1049 goto finished_ro;
1052 ctdb_ltdb_unlock(ctdb_db, key);
1054 finished_ro:
1056 state->call->reply_data.dptr = c->data;
1057 state->call->reply_data.dsize = c->datalen;
1058 state->call->status = c->status;
1060 talloc_steal(state, c);
1062 state->state = CTDB_CALL_DONE;
1063 if (state->async.fn) {
1064 state->async.fn(state);
1070 called when a CTDB_REPLY_DMASTER packet comes in
1072 This packet comes in from the lmaster response to a CTDB_REQ_CALL
1073 request packet. It means that the current dmaster wants to give us
1074 the dmaster role
1076 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1078 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1079 struct ctdb_db_context *ctdb_db;
1080 TDB_DATA key, data;
1081 uint32_t record_flags = 0;
1082 size_t len;
1083 int ret;
1085 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1086 if (ctdb_db == NULL) {
1087 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1088 return;
1091 key.dptr = c->data;
1092 key.dsize = c->keylen;
1093 data.dptr = &c->data[key.dsize];
1094 data.dsize = c->datalen;
1095 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1096 + sizeof(uint32_t);
1097 if (len <= c->hdr.length) {
1098 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
1101 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1102 ctdb_call_input_pkt, ctdb, false);
1103 if (ret == -2) {
1104 return;
1106 if (ret != 0) {
1107 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1108 return;
1111 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1116 called when a CTDB_REPLY_ERROR packet comes in
1118 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1120 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1121 struct ctdb_call_state *state;
1123 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1124 if (state == NULL) {
1125 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1126 ctdb->pnn, hdr->reqid));
1127 return;
1130 if (hdr->reqid != state->reqid) {
1131 /* we found a record but it was the wrong one */
1132 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1133 return;
1136 talloc_steal(state, c);
1138 state->state = CTDB_CALL_ERROR;
1139 state->errmsg = (char *)c->msg;
1140 if (state->async.fn) {
1141 state->async.fn(state);
1147 destroy a ctdb_call
1149 static int ctdb_call_destructor(struct ctdb_call_state *state)
1151 DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1152 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1153 return 0;
1158 called when a ctdb_call needs to be resent after a reconfigure event
1160 static void ctdb_call_resend(struct ctdb_call_state *state)
1162 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1164 state->generation = ctdb->vnn_map->generation;
1166 /* use a new reqid, in case the old reply does eventually come in */
1167 ctdb_reqid_remove(ctdb, state->reqid);
1168 state->reqid = ctdb_reqid_new(ctdb, state);
1169 state->c->hdr.reqid = state->reqid;
1171 /* update the generation count for this request, so its valid with the new vnn_map */
1172 state->c->hdr.generation = state->generation;
1174 /* send the packet to ourselves, it will be redirected appropriately */
1175 state->c->hdr.destnode = ctdb->pnn;
1177 ctdb_queue_packet(ctdb, &state->c->hdr);
1178 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1182 resend all pending calls on recovery
1184 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1186 struct ctdb_call_state *state, *next;
1187 for (state=ctdb->pending_calls;state;state=next) {
1188 next = state->next;
1189 ctdb_call_resend(state);
1194 this allows the caller to setup a async.fn
1196 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1197 struct timeval t, void *private_data)
1199 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1200 if (state->async.fn) {
1201 state->async.fn(state);
1207 construct an event driven local ctdb_call
1209 this is used so that locally processed ctdb_call requests are processed
1210 in an event driven manner
1212 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1213 struct ctdb_call *call,
1214 struct ctdb_ltdb_header *header,
1215 TDB_DATA *data)
1217 struct ctdb_call_state *state;
1218 struct ctdb_context *ctdb = ctdb_db->ctdb;
1219 int ret;
1221 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1222 CTDB_NO_MEMORY_NULL(ctdb, state);
1224 talloc_steal(state, data->dptr);
1226 state->state = CTDB_CALL_DONE;
1227 state->call = talloc(state, struct ctdb_call);
1228 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1229 *(state->call) = *call;
1230 state->ctdb_db = ctdb_db;
1232 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
1233 if (ret != 0) {
1234 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1237 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1239 return state;
1244 make a remote ctdb call - async send. Called in daemon context.
1246 This constructs a ctdb_call request and queues it for processing.
1247 This call never blocks.
1249 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1250 struct ctdb_call *call,
1251 struct ctdb_ltdb_header *header)
1253 uint32_t len;
1254 struct ctdb_call_state *state;
1255 struct ctdb_context *ctdb = ctdb_db->ctdb;
1257 if (ctdb->methods == NULL) {
1258 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1259 return NULL;
1262 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1263 CTDB_NO_MEMORY_NULL(ctdb, state);
1264 state->call = talloc(state, struct ctdb_call);
1265 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1267 state->reqid = ctdb_reqid_new(ctdb, state);
1268 state->ctdb_db = ctdb_db;
1269 talloc_set_destructor(state, ctdb_call_destructor);
1271 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1272 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1273 struct ctdb_req_call);
1274 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1275 state->c->hdr.destnode = header->dmaster;
1277 /* this limits us to 16k outstanding messages - not unreasonable */
1278 state->c->hdr.reqid = state->reqid;
1279 state->c->flags = call->flags;
1280 state->c->db_id = ctdb_db->db_id;
1281 state->c->callid = call->call_id;
1282 state->c->hopcount = 0;
1283 state->c->keylen = call->key.dsize;
1284 state->c->calldatalen = call->call_data.dsize;
1285 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1286 memcpy(&state->c->data[call->key.dsize],
1287 call->call_data.dptr, call->call_data.dsize);
1288 *(state->call) = *call;
1289 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1290 state->call->key.dptr = &state->c->data[0];
1292 state->state = CTDB_CALL_WAIT;
1293 state->generation = ctdb->vnn_map->generation;
1295 DLIST_ADD(ctdb->pending_calls, state);
1297 ctdb_queue_packet(ctdb, &state->c->hdr);
1299 return state;
1303 make a remote ctdb call - async recv - called in daemon context
1305 This is called when the program wants to wait for a ctdb_call to complete and get the
1306 results. This call will block unless the call has already completed.
1308 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1310 while (state->state < CTDB_CALL_DONE) {
1311 event_loop_once(state->ctdb_db->ctdb->ev);
1313 if (state->state != CTDB_CALL_DONE) {
1314 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1315 talloc_free(state);
1316 return -1;
1319 if (state->call->reply_data.dsize) {
1320 call->reply_data.dptr = talloc_memdup(call,
1321 state->call->reply_data.dptr,
1322 state->call->reply_data.dsize);
1323 call->reply_data.dsize = state->call->reply_data.dsize;
1324 } else {
1325 call->reply_data.dptr = NULL;
1326 call->reply_data.dsize = 0;
1328 call->status = state->call->status;
1329 talloc_free(state);
1330 return 0;
1335 send a keepalive packet to the other node
1337 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1339 struct ctdb_req_keepalive *r;
1341 if (ctdb->methods == NULL) {
1342 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1343 return;
1346 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1347 sizeof(struct ctdb_req_keepalive),
1348 struct ctdb_req_keepalive);
1349 CTDB_NO_MEMORY_FATAL(ctdb, r);
1350 r->hdr.destnode = destnode;
1351 r->hdr.reqid = 0;
1353 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1355 ctdb_queue_packet(ctdb, &r->hdr);
1357 talloc_free(r);
1362 struct revokechild_deferred_call {
1363 struct ctdb_context *ctdb;
1364 struct ctdb_req_header *hdr;
1365 deferred_requeue_fn fn;
1366 void *ctx;
1369 struct revokechild_handle {
1370 struct revokechild_handle *next, *prev;
1371 struct ctdb_context *ctdb;
1372 struct ctdb_db_context *ctdb_db;
1373 struct fd_event *fde;
1374 int status;
1375 int fd[2];
1376 pid_t child;
1377 TDB_DATA key;
1380 struct revokechild_requeue_handle {
1381 struct ctdb_context *ctdb;
1382 struct ctdb_req_header *hdr;
1383 deferred_requeue_fn fn;
1384 void *ctx;
1387 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1388 struct timeval t, void *private_data)
1390 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1392 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1393 talloc_free(requeue_handle);
1396 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1398 struct ctdb_context *ctdb = deferred_call->ctdb;
1399 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1400 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1402 requeue_handle->ctdb = ctdb;
1403 requeue_handle->hdr = deferred_call->hdr;
1404 requeue_handle->fn = deferred_call->fn;
1405 requeue_handle->ctx = deferred_call->ctx;
1406 talloc_steal(requeue_handle, requeue_handle->hdr);
1408 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1409 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1411 return 0;
1415 static int revokechild_destructor(struct revokechild_handle *rc)
1417 if (rc->fde != NULL) {
1418 talloc_free(rc->fde);
1421 if (rc->fd[0] != -1) {
1422 close(rc->fd[0]);
1424 if (rc->fd[1] != -1) {
1425 close(rc->fd[1]);
1427 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1429 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1430 return 0;
1433 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1434 uint16_t flags, void *private_data)
1436 struct revokechild_handle *rc = talloc_get_type(private_data,
1437 struct revokechild_handle);
1438 int ret;
1439 char c;
1441 ret = read(rc->fd[0], &c, 1);
1442 if (ret != 1) {
1443 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1444 rc->status = -1;
1445 talloc_free(rc);
1446 return;
1448 if (c != 0) {
1449 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1450 rc->status = -1;
1451 talloc_free(rc);
1452 return;
1455 talloc_free(rc);
1458 struct ctdb_revoke_state {
1459 struct ctdb_db_context *ctdb_db;
1460 TDB_DATA key;
1461 struct ctdb_ltdb_header *header;
1462 TDB_DATA data;
1463 int count;
1464 int status;
1465 int finished;
1468 static void update_record_cb(struct ctdb_client_control_state *state)
1470 struct ctdb_revoke_state *revoke_state;
1471 int ret;
1472 int32_t res;
1474 if (state == NULL) {
1475 return;
1477 revoke_state = state->async.private_data;
1479 state->async.fn = NULL;
1480 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1481 if ((ret != 0) || (res != 0)) {
1482 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1483 revoke_state->status = -1;
1486 revoke_state->count--;
1487 if (revoke_state->count <= 0) {
1488 revoke_state->finished = 1;
1492 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1494 struct ctdb_revoke_state *revoke_state = private_data;
1495 struct ctdb_client_control_state *state;
1497 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1498 if (state == NULL) {
1499 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1500 revoke_state->status = -1;
1501 return;
1503 state->async.fn = update_record_cb;
1504 state->async.private_data = revoke_state;
1506 revoke_state->count++;
1510 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1511 struct timeval yt, void *private_data)
1513 struct ctdb_revoke_state *state = private_data;
1515 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1516 state->finished = 1;
1517 state->status = -1;
1520 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1522 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1523 int status;
1525 state->ctdb_db = ctdb_db;
1526 state->key = key;
1527 state->header = header;
1528 state->data = data;
1530 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1532 event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
1534 while (state->finished == 0) {
1535 event_loop_once(ctdb->ev);
1538 status = state->status;
1540 if (status == 0) {
1541 struct ctdb_ltdb_header new_header;
1542 TDB_DATA new_data;
1544 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1545 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1546 talloc_free(state);
1547 return -1;
1549 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1550 ctdb_ltdb_unlock(ctdb_db, key);
1551 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1552 talloc_free(state);
1553 return -1;
1555 header->rsn++;
1556 if (new_header.rsn > header->rsn) {
1557 ctdb_ltdb_unlock(ctdb_db, key);
1558 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1559 talloc_free(state);
1560 return -1;
1562 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1563 ctdb_ltdb_unlock(ctdb_db, key);
1564 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1565 talloc_free(state);
1566 return -1;
1568 new_header.rsn++;
1569 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1570 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1571 ctdb_ltdb_unlock(ctdb_db, key);
1572 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1573 talloc_free(state);
1574 return -1;
1576 ctdb_ltdb_unlock(ctdb_db, key);
1579 talloc_free(state);
1580 return status;
1584 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1586 TDB_DATA tdata;
1587 struct revokechild_handle *rc;
1588 pid_t parent = getpid();
1589 int ret;
1591 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1592 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1593 header->rsn -= 1;
1595 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1596 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1597 return -1;
1600 tdata = tdb_fetch(ctdb_db->rottdb, key);
1601 if (tdata.dsize > 0) {
1602 uint8_t *tmp;
1604 tmp = tdata.dptr;
1605 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1606 free(tmp);
1609 rc->status = 0;
1610 rc->ctdb = ctdb;
1611 rc->ctdb_db = ctdb_db;
1612 rc->fd[0] = -1;
1613 rc->fd[1] = -1;
1615 talloc_set_destructor(rc, revokechild_destructor);
1617 rc->key.dsize = key.dsize;
1618 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1619 if (rc->key.dptr == NULL) {
1620 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1621 talloc_free(rc);
1622 return -1;
1625 ret = pipe(rc->fd);
1626 if (ret != 0) {
1627 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1628 talloc_free(rc);
1629 return -1;
1633 rc->child = ctdb_fork(ctdb);
1634 if (rc->child == (pid_t)-1) {
1635 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1636 talloc_free(rc);
1637 return -1;
1640 if (rc->child == 0) {
1641 char c = 0;
1642 close(rc->fd[0]);
1643 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1645 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1646 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1647 c = 1;
1648 goto child_finished;
1651 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1653 child_finished:
1654 write(rc->fd[1], &c, 1);
1655 /* make sure we die when our parent dies */
1656 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1657 sleep(5);
1659 _exit(0);
1662 close(rc->fd[1]);
1663 rc->fd[1] = -1;
1664 set_close_on_exec(rc->fd[0]);
1666 /* This is an active revokechild child process */
1667 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1669 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1670 EVENT_FD_READ, revokechild_handler,
1671 (void *)rc);
1672 if (rc->fde == NULL) {
1673 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1674 talloc_free(rc);
1676 tevent_fd_set_auto_close(rc->fde);
1678 return 0;
1681 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1683 struct revokechild_handle *rc;
1684 struct revokechild_deferred_call *deferred_call;
1686 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1687 if (rc->key.dsize == 0) {
1688 continue;
1690 if (rc->key.dsize != key.dsize) {
1691 continue;
1693 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1694 break;
1698 if (rc == NULL) {
1699 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1700 return -1;
1703 deferred_call = talloc(rc, struct revokechild_deferred_call);
1704 if (deferred_call == NULL) {
1705 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1706 return -1;
1709 deferred_call->ctdb = ctdb;
1710 deferred_call->hdr = hdr;
1711 deferred_call->fn = fn;
1712 deferred_call->ctx = call_context;
1714 talloc_set_destructor(deferred_call, deferred_call_destructor);
1715 talloc_steal(deferred_call, hdr);
1717 return 0;