libnet: Remove libnet_samsync
[Samba.git] / ctdb / client / ctdb_client.c
blobd399cb551f68a4d0426f341a2949c7e22172c349
1 /*
2 ctdb daemon code
4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/locale.h"
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/time.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
45 allocate a packet for use in client<->daemon communication
47 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
48 TALLOC_CTX *mem_ctx,
49 enum ctdb_operation operation,
50 size_t length, size_t slength,
51 const char *type)
53 int size;
54 struct ctdb_req_header *hdr;
56 length = MAX(length, slength);
57 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
59 hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
60 if (hdr == NULL) {
61 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
62 operation, (unsigned)length));
63 return NULL;
65 talloc_set_name_const(hdr, type);
66 hdr->length = length;
67 hdr->operation = operation;
68 hdr->ctdb_magic = CTDB_MAGIC;
69 hdr->ctdb_version = CTDB_PROTOCOL;
70 hdr->srcnode = ctdb->pnn;
71 if (ctdb->vnn_map) {
72 hdr->generation = ctdb->vnn_map->generation;
75 return hdr;
79 local version of ctdb_call
81 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
82 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
83 TDB_DATA *data, bool updatetdb)
85 struct ctdb_call_info *c;
86 struct ctdb_registered_call *fn;
87 struct ctdb_context *ctdb = ctdb_db->ctdb;
89 c = talloc(ctdb, struct ctdb_call_info);
90 CTDB_NO_MEMORY(ctdb, c);
92 c->key = call->key;
93 c->call_data = &call->call_data;
94 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
95 c->record_data.dsize = data->dsize;
96 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
97 c->new_data = NULL;
98 c->reply_data = NULL;
99 c->status = 0;
100 c->header = header;
102 for (fn=ctdb_db->calls;fn;fn=fn->next) {
103 if (fn->id == call->call_id) break;
105 if (fn == NULL) {
106 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
107 talloc_free(c);
108 return -1;
111 if (fn->fn(c) != 0) {
112 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
113 talloc_free(c);
114 return -1;
117 /* we need to force the record to be written out if this was a remote access */
118 if (c->new_data == NULL) {
119 c->new_data = &c->record_data;
122 if (c->new_data && updatetdb) {
123 /* XXX check that we always have the lock here? */
124 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
125 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
126 talloc_free(c);
127 return -1;
131 if (c->reply_data) {
132 call->reply_data = *c->reply_data;
134 talloc_steal(call, call->reply_data.dptr);
135 talloc_set_name_const(call->reply_data.dptr, __location__);
136 } else {
137 call->reply_data.dptr = NULL;
138 call->reply_data.dsize = 0;
140 call->status = c->status;
142 talloc_free(c);
144 return 0;
149 queue a packet for sending from client to daemon
151 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
153 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
158 called when a CTDB_REPLY_CALL packet comes in in the client
160 This packet comes in response to a CTDB_REQ_CALL request packet. It
161 contains any reply data from the call
163 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
165 struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
166 struct ctdb_client_call_state *state;
168 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
169 if (state == NULL) {
170 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
171 return;
174 if (hdr->reqid != state->reqid) {
175 /* we found a record but it was the wrong one */
176 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
177 return;
180 state->call->reply_data.dptr = c->data;
181 state->call->reply_data.dsize = c->datalen;
182 state->call->status = c->status;
184 talloc_steal(state, c);
186 state->state = CTDB_CALL_DONE;
188 if (state->async.fn) {
189 state->async.fn(state);
193 void ctdb_request_message(struct ctdb_context *ctdb,
194 struct ctdb_req_header *hdr)
196 struct ctdb_req_message_old *c = (struct ctdb_req_message_old *)hdr;
197 TDB_DATA data;
199 data.dsize = c->datalen;
200 data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
201 if (data.dptr == NULL) {
202 DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
203 return;
206 srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
209 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
212 this is called in the client, when data comes in from the daemon
214 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
216 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
217 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
218 TALLOC_CTX *tmp_ctx;
220 /* place the packet as a child of a tmp_ctx. We then use
221 talloc_free() below to free it. If any of the calls want
222 to keep it, then they will steal it somewhere else, and the
223 talloc_free() will be a no-op */
224 tmp_ctx = talloc_new(ctdb);
225 talloc_steal(tmp_ctx, hdr);
227 if (cnt == 0) {
228 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
229 exit(1);
232 if (cnt < sizeof(*hdr)) {
233 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
234 goto done;
236 if (cnt != hdr->length) {
237 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
238 (unsigned)hdr->length, (unsigned)cnt);
239 goto done;
242 if (hdr->ctdb_magic != CTDB_MAGIC) {
243 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
244 goto done;
247 if (hdr->ctdb_version != CTDB_PROTOCOL) {
248 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
249 goto done;
252 switch (hdr->operation) {
253 case CTDB_REPLY_CALL:
254 ctdb_client_reply_call(ctdb, hdr);
255 break;
257 case CTDB_REQ_MESSAGE:
258 ctdb_request_message(ctdb, hdr);
259 break;
261 case CTDB_REPLY_CONTROL:
262 ctdb_client_reply_control(ctdb, hdr);
263 break;
265 default:
266 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
269 done:
270 talloc_free(tmp_ctx);
274 connect to a unix domain socket
276 int ctdb_socket_connect(struct ctdb_context *ctdb)
278 struct sockaddr_un addr;
279 int ret;
281 memset(&addr, 0, sizeof(addr));
282 addr.sun_family = AF_UNIX;
283 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
285 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
286 if (ctdb->daemon.sd == -1) {
287 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
288 return -1;
291 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
292 DEBUG(DEBUG_ERR,
293 (__location__
294 "Failed to connect client socket to daemon (%s)\n",
295 strerror(errno)));
296 close(ctdb->daemon.sd);
297 ctdb->daemon.sd = -1;
298 return -1;
301 ret = set_blocking(ctdb->daemon.sd, false);
302 if (ret != 0) {
303 DEBUG(DEBUG_ERR,
304 (__location__
305 " failed to set socket non-blocking (%s)\n",
306 strerror(errno)));
307 close(ctdb->daemon.sd);
308 ctdb->daemon.sd = -1;
309 return -1;
312 set_close_on_exec(ctdb->daemon.sd);
314 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
315 CTDB_DS_ALIGNMENT,
316 ctdb_client_read_cb, ctdb, "to-ctdbd");
317 return 0;
321 struct ctdb_record_handle {
322 struct ctdb_db_context *ctdb_db;
323 TDB_DATA key;
324 TDB_DATA *data;
325 struct ctdb_ltdb_header header;
330 make a recv call to the local ctdb daemon - called from client context
332 This is called when the program wants to wait for a ctdb_call to complete and get the
333 results. This call will block unless the call has already completed.
335 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
337 if (state == NULL) {
338 return -1;
341 while (state->state < CTDB_CALL_DONE) {
342 tevent_loop_once(state->ctdb_db->ctdb->ev);
344 if (state->state != CTDB_CALL_DONE) {
345 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
346 talloc_free(state);
347 return -1;
350 if (state->call->reply_data.dsize) {
351 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
352 state->call->reply_data.dptr,
353 state->call->reply_data.dsize);
354 call->reply_data.dsize = state->call->reply_data.dsize;
355 } else {
356 call->reply_data.dptr = NULL;
357 call->reply_data.dsize = 0;
359 call->status = state->call->status;
360 talloc_free(state);
362 return call->status;
369 destroy a ctdb_call in client
371 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
373 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
374 return 0;
378 construct an event driven local ctdb_call
380 this is used so that locally processed ctdb_call requests are processed
381 in an event driven manner
383 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
384 struct ctdb_call *call,
385 struct ctdb_ltdb_header *header,
386 TDB_DATA *data)
388 struct ctdb_client_call_state *state;
389 struct ctdb_context *ctdb = ctdb_db->ctdb;
390 int ret;
392 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
393 CTDB_NO_MEMORY_NULL(ctdb, state);
394 state->call = talloc_zero(state, struct ctdb_call);
395 CTDB_NO_MEMORY_NULL(ctdb, state->call);
397 talloc_steal(state, data->dptr);
399 state->state = CTDB_CALL_DONE;
400 *(state->call) = *call;
401 state->ctdb_db = ctdb_db;
403 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
404 if (ret != 0) {
405 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
408 return state;
412 make a ctdb call to the local daemon - async send. Called from client context.
414 This constructs a ctdb_call request and queues it for processing.
415 This call never blocks.
417 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
418 struct ctdb_call *call)
420 struct ctdb_client_call_state *state;
421 struct ctdb_context *ctdb = ctdb_db->ctdb;
422 struct ctdb_ltdb_header header;
423 TDB_DATA data;
424 int ret;
425 size_t len;
426 struct ctdb_req_call_old *c;
428 /* if the domain socket is not yet open, open it */
429 if (ctdb->daemon.sd==-1) {
430 ctdb_socket_connect(ctdb);
433 ret = ctdb_ltdb_lock(ctdb_db, call->key);
434 if (ret != 0) {
435 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
436 return NULL;
439 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
441 if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
442 ret = -1;
445 if (ret == 0 && header.dmaster == ctdb->pnn) {
446 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
447 talloc_free(data.dptr);
448 ctdb_ltdb_unlock(ctdb_db, call->key);
449 return state;
452 ctdb_ltdb_unlock(ctdb_db, call->key);
453 talloc_free(data.dptr);
455 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
456 if (state == NULL) {
457 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
458 return NULL;
460 state->call = talloc_zero(state, struct ctdb_call);
461 if (state->call == NULL) {
462 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
463 return NULL;
466 len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
467 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call_old);
468 if (c == NULL) {
469 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
470 return NULL;
473 state->reqid = reqid_new(ctdb->idr, state);
474 state->ctdb_db = ctdb_db;
475 talloc_set_destructor(state, ctdb_client_call_destructor);
477 c->hdr.reqid = state->reqid;
478 c->flags = call->flags;
479 c->db_id = ctdb_db->db_id;
480 c->callid = call->call_id;
481 c->hopcount = 0;
482 c->keylen = call->key.dsize;
483 c->calldatalen = call->call_data.dsize;
484 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
485 memcpy(&c->data[call->key.dsize],
486 call->call_data.dptr, call->call_data.dsize);
487 *(state->call) = *call;
488 state->call->call_data.dptr = &c->data[call->key.dsize];
489 state->call->key.dptr = &c->data[0];
491 state->state = CTDB_CALL_WAIT;
494 ctdb_client_queue_pkt(ctdb, &c->hdr);
496 return state;
501 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
503 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
505 struct ctdb_client_call_state *state;
507 state = ctdb_call_send(ctdb_db, call);
508 return ctdb_call_recv(state, call);
513 tell the daemon what messaging srvid we will use, and register the message
514 handler function in the client
516 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
517 srvid_handler_fn handler,
518 void *private_data)
520 int res;
521 int32_t status;
523 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
524 CTDB_CONTROL_REGISTER_SRVID, 0,
525 tdb_null, NULL, NULL, &status, NULL, NULL);
526 if (res != 0 || status != 0) {
527 DEBUG(DEBUG_ERR,
528 ("Failed to register srvid %llu\n",
529 (unsigned long long)srvid));
530 return -1;
533 /* also need to register the handler with our own ctdb structure */
534 return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
538 tell the daemon we no longer want a srvid
540 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
541 uint64_t srvid, void *private_data)
543 int res;
544 int32_t status;
546 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid,
547 CTDB_CONTROL_DEREGISTER_SRVID, 0,
548 tdb_null, NULL, NULL, &status, NULL, NULL);
549 if (res != 0 || status != 0) {
550 DEBUG(DEBUG_ERR,
551 ("Failed to deregister srvid %llu\n",
552 (unsigned long long)srvid));
553 return -1;
556 /* also need to register the handler with our own ctdb structure */
557 srvid_deregister(ctdb->srv, srvid, private_data);
558 return 0;
562 send a message - from client context
564 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
565 uint64_t srvid, TDB_DATA data)
567 struct ctdb_req_message_old *r;
568 int len, res;
570 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
571 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
572 len, struct ctdb_req_message_old);
573 CTDB_NO_MEMORY(ctdb, r);
575 r->hdr.destnode = pnn;
576 r->srvid = srvid;
577 r->datalen = data.dsize;
578 memcpy(&r->data[0], data.dptr, data.dsize);
580 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
581 talloc_free(r);
582 return res;
587 cancel a ctdb_fetch_lock operation, releasing the lock
589 static int fetch_lock_destructor(struct ctdb_record_handle *h)
591 ctdb_ltdb_unlock(h->ctdb_db, h->key);
592 return 0;
596 force the migration of a record to this node
598 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
600 struct ctdb_call call;
601 ZERO_STRUCT(call);
602 call.call_id = CTDB_NULL_FUNC;
603 call.key = key;
604 call.flags = CTDB_IMMEDIATE_MIGRATION;
605 return ctdb_call(ctdb_db, &call);
609 try to fetch a readonly copy of a record
611 static int
612 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
614 int ret;
616 struct ctdb_call call;
617 ZERO_STRUCT(call);
619 call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
620 call.call_data.dptr = NULL;
621 call.call_data.dsize = 0;
622 call.key = key;
623 call.flags = CTDB_WANT_READONLY;
624 ret = ctdb_call(ctdb_db, &call);
626 if (ret != 0) {
627 return -1;
629 if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
630 return -1;
633 *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
634 if (*hdr == NULL) {
635 talloc_free(call.reply_data.dptr);
636 return -1;
639 data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
640 data->dptr = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
641 if (data->dptr == NULL) {
642 talloc_free(call.reply_data.dptr);
643 talloc_free(hdr);
644 return -1;
647 return 0;
651 get a lock on a record, and return the records data. Blocks until it gets the lock
653 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
654 TDB_DATA key, TDB_DATA *data)
656 int ret;
657 struct ctdb_record_handle *h;
660 procedure is as follows:
662 1) get the chain lock.
663 2) check if we are dmaster
664 3) if we are the dmaster then return handle
665 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
666 reply from ctdbd
667 5) when we get the reply, goto (1)
670 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
671 if (h == NULL) {
672 return NULL;
675 h->ctdb_db = ctdb_db;
676 h->key = key;
677 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
678 if (h->key.dptr == NULL) {
679 talloc_free(h);
680 return NULL;
682 h->data = data;
684 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
685 (const char *)key.dptr));
687 again:
688 /* step 1 - get the chain lock */
689 ret = ctdb_ltdb_lock(ctdb_db, key);
690 if (ret != 0) {
691 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
692 talloc_free(h);
693 return NULL;
696 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
698 talloc_set_destructor(h, fetch_lock_destructor);
700 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
702 /* when torturing, ensure we test the remote path */
703 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
704 random() % 5 == 0) {
705 h->header.dmaster = (uint32_t)-1;
709 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
711 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
712 ctdb_ltdb_unlock(ctdb_db, key);
713 ret = ctdb_client_force_migration(ctdb_db, key);
714 if (ret != 0) {
715 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
716 talloc_free(h);
717 return NULL;
719 goto again;
722 /* if this is a request for read/write and we have delegations
723 we have to revoke all delegations first
725 if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
726 (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
727 ctdb_ltdb_unlock(ctdb_db, key);
728 ret = ctdb_client_force_migration(ctdb_db, key);
729 if (ret != 0) {
730 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
731 talloc_free(h);
732 return NULL;
734 goto again;
737 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
738 return h;
742 get a readonly lock on a record, and return the records data. Blocks until it gets the lock
744 struct ctdb_record_handle *
745 ctdb_fetch_readonly_lock(
746 struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
747 TDB_DATA key, TDB_DATA *data,
748 int read_only)
750 int ret;
751 struct ctdb_record_handle *h;
752 struct ctdb_ltdb_header *roheader = NULL;
754 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
755 if (h == NULL) {
756 return NULL;
759 h->ctdb_db = ctdb_db;
760 h->key = key;
761 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
762 if (h->key.dptr == NULL) {
763 talloc_free(h);
764 return NULL;
766 h->data = data;
768 data->dptr = NULL;
769 data->dsize = 0;
772 again:
773 talloc_free(roheader);
774 roheader = NULL;
776 talloc_free(data->dptr);
777 data->dptr = NULL;
778 data->dsize = 0;
780 /* Lock the record/chain */
781 ret = ctdb_ltdb_lock(ctdb_db, key);
782 if (ret != 0) {
783 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
784 talloc_free(h);
785 return NULL;
788 talloc_set_destructor(h, fetch_lock_destructor);
790 /* Check if record exists yet in the TDB */
791 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
792 if (ret != 0) {
793 ctdb_ltdb_unlock(ctdb_db, key);
794 ret = ctdb_client_force_migration(ctdb_db, key);
795 if (ret != 0) {
796 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
797 talloc_free(h);
798 return NULL;
800 goto again;
803 /* if this is a request for read/write and we have delegations
804 we have to revoke all delegations first
806 if ((read_only == 0)
807 && (h->header.dmaster == ctdb_db->ctdb->pnn)
808 && (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
809 ctdb_ltdb_unlock(ctdb_db, key);
810 ret = ctdb_client_force_migration(ctdb_db, key);
811 if (ret != 0) {
812 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
813 talloc_free(h);
814 return NULL;
816 goto again;
819 /* if we are dmaster, just return the handle */
820 if (h->header.dmaster == ctdb_db->ctdb->pnn) {
821 return h;
824 if (read_only != 0) {
825 TDB_DATA rodata = {NULL, 0};
827 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
828 || (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
829 return h;
832 ctdb_ltdb_unlock(ctdb_db, key);
833 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
834 if (ret != 0) {
835 DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock: failed. force migration and try again\n"));
836 ret = ctdb_client_force_migration(ctdb_db, key);
837 if (ret != 0) {
838 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
839 talloc_free(h);
840 return NULL;
843 goto again;
846 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
847 ret = ctdb_client_force_migration(ctdb_db, key);
848 if (ret != 0) {
849 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
850 talloc_free(h);
851 return NULL;
854 goto again;
857 ret = ctdb_ltdb_lock(ctdb_db, key);
858 if (ret != 0) {
859 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
860 talloc_free(h);
861 return NULL;
864 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
865 if (ret != 0) {
866 ctdb_ltdb_unlock(ctdb_db, key);
868 ret = ctdb_client_force_migration(ctdb_db, key);
869 if (ret != 0) {
870 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
871 talloc_free(h);
872 return NULL;
875 goto again;
878 return h;
881 /* we are not dmaster and this was not a request for a readonly lock
882 * so unlock the record, migrate it and try again
884 ctdb_ltdb_unlock(ctdb_db, key);
885 ret = ctdb_client_force_migration(ctdb_db, key);
886 if (ret != 0) {
887 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
888 talloc_free(h);
889 return NULL;
891 goto again;
895 store some data to the record that was locked with ctdb_fetch_lock()
897 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
899 if (! ctdb_db_volatile(h->ctdb_db)) {
900 DEBUG(DEBUG_ERR,
901 ("ctdb_record_store prohibited for non-volatile dbs\n"));
902 return -1;
905 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
909 non-locking fetch of a record
911 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
912 TDB_DATA key, TDB_DATA *data)
914 struct ctdb_call call;
915 int ret;
917 call.call_id = CTDB_FETCH_FUNC;
918 call.call_data.dptr = NULL;
919 call.call_data.dsize = 0;
920 call.key = key;
922 ret = ctdb_call(ctdb_db, &call);
924 if (ret == 0) {
925 *data = call.reply_data;
926 talloc_steal(mem_ctx, data->dptr);
929 return ret;
935 called when a control completes or timesout to invoke the callback
936 function the user provided
938 static void invoke_control_callback(struct tevent_context *ev,
939 struct tevent_timer *te,
940 struct timeval t, void *private_data)
942 struct ctdb_client_control_state *state;
943 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
944 int ret;
946 state = talloc_get_type(private_data, struct ctdb_client_control_state);
947 talloc_steal(tmp_ctx, state);
949 ret = ctdb_control_recv(state->ctdb, state, state,
950 NULL,
951 NULL,
952 NULL);
953 if (ret != 0) {
954 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
957 talloc_free(tmp_ctx);
961 called when a CTDB_REPLY_CONTROL packet comes in in the client
963 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
964 contains any reply data from the control
966 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
967 struct ctdb_req_header *hdr)
969 struct ctdb_reply_control_old *c = (struct ctdb_reply_control_old *)hdr;
970 struct ctdb_client_control_state *state;
972 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_client_control_state);
973 if (state == NULL) {
974 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
975 return;
978 if (hdr->reqid != state->reqid) {
979 /* we found a record but it was the wrong one */
980 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
981 return;
984 state->outdata.dptr = c->data;
985 state->outdata.dsize = c->datalen;
986 state->status = c->status;
987 if (c->errorlen) {
988 state->errormsg = talloc_strndup(state,
989 (char *)&c->data[c->datalen],
990 c->errorlen);
993 /* state->outdata now uses resources from c so we don't want c
994 to just dissappear from under us while state is still alive
996 talloc_steal(state, c);
998 state->state = CTDB_CONTROL_DONE;
1000 /* if we had a callback registered for this control, pull the response
1001 and call the callback.
1003 if (state->async.fn) {
1004 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1005 invoke_control_callback, state);
1011 destroy a ctdb_control in client
1013 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
1015 reqid_remove(state->ctdb->idr, state->reqid);
1016 return 0;
1020 /* time out handler for ctdb_control */
1021 static void control_timeout_func(struct tevent_context *ev,
1022 struct tevent_timer *te,
1023 struct timeval t, void *private_data)
1025 struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
1027 DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
1028 "dstnode:%u\n", state->reqid, state->c->opcode,
1029 state->c->hdr.destnode));
1031 state->state = CTDB_CONTROL_TIMEOUT;
1033 /* if we had a callback registered for this control, pull the response
1034 and call the callback.
1036 if (state->async.fn) {
1037 tevent_add_timer(state->ctdb->ev, state, timeval_zero(),
1038 invoke_control_callback, state);
1042 /* async version of send control request */
1043 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
1044 uint32_t destnode, uint64_t srvid,
1045 uint32_t opcode, uint32_t flags, TDB_DATA data,
1046 TALLOC_CTX *mem_ctx,
1047 struct timeval *timeout,
1048 char **errormsg)
1050 struct ctdb_client_control_state *state;
1051 size_t len;
1052 struct ctdb_req_control_old *c;
1053 int ret;
1055 if (errormsg) {
1056 *errormsg = NULL;
1059 /* if the domain socket is not yet open, open it */
1060 if (ctdb->daemon.sd==-1) {
1061 ctdb_socket_connect(ctdb);
1064 state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1065 CTDB_NO_MEMORY_NULL(ctdb, state);
1067 state->ctdb = ctdb;
1068 state->reqid = reqid_new(ctdb->idr, state);
1069 state->state = CTDB_CONTROL_WAIT;
1070 state->errormsg = NULL;
1072 talloc_set_destructor(state, ctdb_client_control_destructor);
1074 len = offsetof(struct ctdb_req_control_old, data) + data.dsize;
1075 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
1076 len, struct ctdb_req_control_old);
1077 state->c = c;
1078 CTDB_NO_MEMORY_NULL(ctdb, c);
1079 c->hdr.reqid = state->reqid;
1080 c->hdr.destnode = destnode;
1081 c->opcode = opcode;
1082 c->client_id = 0;
1083 c->flags = flags;
1084 c->srvid = srvid;
1085 c->datalen = data.dsize;
1086 if (data.dsize) {
1087 memcpy(&c->data[0], data.dptr, data.dsize);
1090 /* timeout */
1091 if (timeout && !timeval_is_zero(timeout)) {
1092 tevent_add_timer(ctdb->ev, state, *timeout,
1093 control_timeout_func, state);
1096 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1097 if (ret != 0) {
1098 talloc_free(state);
1099 return NULL;
1102 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1103 talloc_free(state);
1104 return NULL;
1107 return state;
1111 /* async version of receive control reply */
1112 int ctdb_control_recv(struct ctdb_context *ctdb,
1113 struct ctdb_client_control_state *state,
1114 TALLOC_CTX *mem_ctx,
1115 TDB_DATA *outdata, int32_t *status, char **errormsg)
1117 TALLOC_CTX *tmp_ctx;
1119 if (status != NULL) {
1120 *status = -1;
1122 if (errormsg != NULL) {
1123 *errormsg = NULL;
1126 if (state == NULL) {
1127 return -1;
1130 /* prevent double free of state */
1131 tmp_ctx = talloc_new(ctdb);
1132 talloc_steal(tmp_ctx, state);
1134 /* loop one event at a time until we either timeout or the control
1135 completes.
1137 while (state->state == CTDB_CONTROL_WAIT) {
1138 tevent_loop_once(ctdb->ev);
1141 if (state->state != CTDB_CONTROL_DONE) {
1142 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1143 if (state->async.fn) {
1144 state->async.fn(state);
1146 talloc_free(tmp_ctx);
1147 return -1;
1150 if (state->errormsg) {
1151 int s = (state->status == 0 ? -1 : state->status);
1152 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1153 if (errormsg) {
1154 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1156 if (state->async.fn) {
1157 state->async.fn(state);
1159 talloc_free(tmp_ctx);
1160 return s;
1163 if (outdata) {
1164 *outdata = state->outdata;
1165 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1168 if (status) {
1169 *status = state->status;
1172 if (state->async.fn) {
1173 state->async.fn(state);
1176 talloc_free(tmp_ctx);
1177 return 0;
1183 send a ctdb control message
1184 timeout specifies how long we should wait for a reply.
1185 if timeout is NULL we wait indefinitely
1187 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
1188 uint32_t opcode, uint32_t flags, TDB_DATA data,
1189 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1190 struct timeval *timeout,
1191 char **errormsg)
1193 struct ctdb_client_control_state *state;
1195 state = ctdb_control_send(ctdb, destnode, srvid, opcode,
1196 flags, data, mem_ctx,
1197 timeout, errormsg);
1199 /* FIXME: Error conditions in ctdb_control_send return NULL without
1200 * setting errormsg. So, there is no way to distinguish between sucess
1201 * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1202 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1203 if (status != NULL) {
1204 *status = 0;
1206 return 0;
1209 return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
1210 errormsg);
1217 a process exists call. Returns 0 if process exists, -1 otherwise
1219 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1221 int ret;
1222 TDB_DATA data;
1223 int32_t status;
1225 data.dptr = (uint8_t*)&pid;
1226 data.dsize = sizeof(pid);
1228 ret = ctdb_control(ctdb, destnode, 0,
1229 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
1230 NULL, NULL, &status, NULL, NULL);
1231 if (ret != 0) {
1232 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1233 return -1;
1236 return status;
1240 get remote statistics
1242 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1244 int ret;
1245 TDB_DATA data;
1246 int32_t res;
1248 ret = ctdb_control(ctdb, destnode, 0,
1249 CTDB_CONTROL_STATISTICS, 0, tdb_null,
1250 ctdb, &data, &res, NULL, NULL);
1251 if (ret != 0 || res != 0) {
1252 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1253 return -1;
1256 if (data.dsize != sizeof(struct ctdb_statistics)) {
1257 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1258 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1259 return -1;
1262 *status = *(struct ctdb_statistics *)data.dptr;
1263 talloc_free(data.dptr);
1265 return 0;
1269 * get db statistics
1271 int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1272 TALLOC_CTX *mem_ctx, struct ctdb_db_statistics_old **dbstat)
1274 int ret;
1275 TDB_DATA indata, outdata;
1276 int32_t res;
1277 struct ctdb_db_statistics_old *wire, *s;
1278 char *ptr;
1279 int i;
1281 indata.dptr = (uint8_t *)&dbid;
1282 indata.dsize = sizeof(dbid);
1284 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
1285 0, indata, ctdb, &outdata, &res, NULL, NULL);
1286 if (ret != 0 || res != 0) {
1287 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
1288 return -1;
1291 if (outdata.dsize < offsetof(struct ctdb_db_statistics_old, hot_keys_wire)) {
1292 DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
1293 outdata.dsize,
1294 (long unsigned int)sizeof(struct ctdb_statistics)));
1295 return -1;
1298 s = talloc_zero(mem_ctx, struct ctdb_db_statistics_old);
1299 if (s == NULL) {
1300 talloc_free(outdata.dptr);
1301 CTDB_NO_MEMORY(ctdb, s);
1304 wire = (struct ctdb_db_statistics_old *)outdata.dptr;
1305 memcpy(s, wire, offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
1306 ptr = &wire->hot_keys_wire[0];
1307 for (i=0; i<wire->num_hot_keys; i++) {
1308 s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
1309 if (s->hot_keys[i].key.dptr == NULL) {
1310 talloc_free(outdata.dptr);
1311 CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
1314 memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
1315 ptr += wire->hot_keys[i].key.dsize;
1318 talloc_free(outdata.dptr);
1319 *dbstat = s;
1320 return 0;
1324 shutdown a remote ctdb node
1326 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1328 struct ctdb_client_control_state *state;
1330 state = ctdb_control_send(ctdb, destnode, 0,
1331 CTDB_CONTROL_SHUTDOWN, 0, tdb_null,
1332 NULL, &timeout, NULL);
1333 if (state == NULL) {
1334 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1335 return -1;
1338 return 0;
1342 get vnn map from a remote node
1344 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1346 int ret;
1347 TDB_DATA outdata;
1348 int32_t res;
1349 struct ctdb_vnn_map_wire *map;
1351 ret = ctdb_control(ctdb, destnode, 0,
1352 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
1353 mem_ctx, &outdata, &res, &timeout, NULL);
1354 if (ret != 0 || res != 0) {
1355 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1356 return -1;
1359 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1360 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1361 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1362 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1363 return -1;
1366 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1367 CTDB_NO_MEMORY(ctdb, *vnnmap);
1368 (*vnnmap)->generation = map->generation;
1369 (*vnnmap)->size = map->size;
1370 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
1372 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1373 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1374 talloc_free(outdata.dptr);
1376 return 0;
1381 get the recovery mode of a remote node
1383 struct ctdb_client_control_state *
1384 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1386 return ctdb_control_send(ctdb, destnode, 0,
1387 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
1388 mem_ctx, &timeout, NULL);
1391 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1393 int ret;
1394 int32_t res;
1396 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1397 if (ret != 0) {
1398 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1399 return -1;
1402 if (recmode) {
1403 *recmode = (uint32_t)res;
1406 return 0;
1409 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1411 struct ctdb_client_control_state *state;
1413 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1414 return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1421 set the recovery mode of a remote node
1423 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1425 int ret;
1426 TDB_DATA data;
1427 int32_t res;
1429 data.dsize = sizeof(uint32_t);
1430 data.dptr = (unsigned char *)&recmode;
1432 ret = ctdb_control(ctdb, destnode, 0,
1433 CTDB_CONTROL_SET_RECMODE, 0, data,
1434 NULL, NULL, &res, &timeout, NULL);
1435 if (ret != 0 || res != 0) {
1436 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1437 return -1;
1440 return 0;
1446 get the recovery master of a remote node
1448 struct ctdb_client_control_state *
1449 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1450 struct timeval timeout, uint32_t destnode)
1452 return ctdb_control_send(ctdb, destnode, 0,
1453 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
1454 mem_ctx, &timeout, NULL);
1457 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1459 int ret;
1460 int32_t res;
1462 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1463 if (ret != 0) {
1464 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1465 return -1;
1468 if (recmaster) {
1469 *recmaster = (uint32_t)res;
1472 return 0;
1475 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1477 struct ctdb_client_control_state *state;
1479 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1480 return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1485 set the recovery master of a remote node
1487 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1489 int ret;
1490 TDB_DATA data;
1491 int32_t res;
1493 ZERO_STRUCT(data);
1494 data.dsize = sizeof(uint32_t);
1495 data.dptr = (unsigned char *)&recmaster;
1497 ret = ctdb_control(ctdb, destnode, 0,
1498 CTDB_CONTROL_SET_RECMASTER, 0, data,
1499 NULL, NULL, &res, &timeout, NULL);
1500 if (ret != 0 || res != 0) {
1501 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1502 return -1;
1505 return 0;
1510 get a list of databases off a remote node
1512 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1513 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map_old **dbmap)
1515 int ret;
1516 TDB_DATA outdata;
1517 int32_t res;
1519 ret = ctdb_control(ctdb, destnode, 0,
1520 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1521 mem_ctx, &outdata, &res, &timeout, NULL);
1522 if (ret != 0 || res != 0) {
1523 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1524 return -1;
1527 *dbmap = (struct ctdb_dbid_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1528 talloc_free(outdata.dptr);
1530 return 0;
1534 get a list of nodes (vnn and flags ) from a remote node
1536 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1537 struct timeval timeout, uint32_t destnode,
1538 TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1540 int ret;
1541 TDB_DATA outdata;
1542 int32_t res;
1544 ret = ctdb_control(ctdb, destnode, 0,
1545 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1546 mem_ctx, &outdata, &res, &timeout, NULL);
1547 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1548 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1549 return -1;
1552 *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1553 talloc_free(outdata.dptr);
1554 return 0;
1558 load nodes file on a remote node and return as a node map
1560 int ctdb_ctrl_getnodesfile(struct ctdb_context *ctdb,
1561 struct timeval timeout, uint32_t destnode,
1562 TALLOC_CTX *mem_ctx, struct ctdb_node_map_old **nodemap)
1564 int ret;
1565 TDB_DATA outdata;
1566 int32_t res;
1568 ret = ctdb_control(ctdb, destnode, 0,
1569 CTDB_CONTROL_GET_NODES_FILE, 0, tdb_null,
1570 mem_ctx, &outdata, &res, &timeout, NULL);
1571 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1572 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1573 return -1;
1576 *nodemap = (struct ctdb_node_map_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1577 talloc_free(outdata.dptr);
1579 return 0;
1583 drop the transport, reload the nodes file and restart the transport
1585 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb,
1586 struct timeval timeout, uint32_t destnode)
1588 int ret;
1589 int32_t res;
1591 ret = ctdb_control(ctdb, destnode, 0,
1592 CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null,
1593 NULL, NULL, &res, &timeout, NULL);
1594 if (ret != 0 || res != 0) {
1595 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1596 return -1;
1599 return 0;
1604 set vnn map on a node
1606 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1607 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1609 int ret;
1610 TDB_DATA data;
1611 int32_t res;
1612 struct ctdb_vnn_map_wire *map;
1613 size_t len;
1615 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1616 map = talloc_size(mem_ctx, len);
1617 CTDB_NO_MEMORY(ctdb, map);
1619 map->generation = vnnmap->generation;
1620 map->size = vnnmap->size;
1621 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1623 data.dsize = len;
1624 data.dptr = (uint8_t *)map;
1626 ret = ctdb_control(ctdb, destnode, 0,
1627 CTDB_CONTROL_SETVNNMAP, 0, data,
1628 NULL, NULL, &res, &timeout, NULL);
1629 if (ret != 0 || res != 0) {
1630 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1631 return -1;
1634 talloc_free(map);
1636 return 0;
1641 async send for pull database
1643 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1644 struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1645 uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1647 TDB_DATA indata;
1648 struct ctdb_pulldb *pull;
1649 struct ctdb_client_control_state *state;
1651 pull = talloc(mem_ctx, struct ctdb_pulldb);
1652 CTDB_NO_MEMORY_NULL(ctdb, pull);
1654 pull->db_id = dbid;
1655 pull->lmaster = lmaster;
1657 indata.dsize = sizeof(struct ctdb_pulldb);
1658 indata.dptr = (unsigned char *)pull;
1660 state = ctdb_control_send(ctdb, destnode, 0,
1661 CTDB_CONTROL_PULL_DB, 0, indata,
1662 mem_ctx, &timeout, NULL);
1663 talloc_free(pull);
1665 return state;
1669 async recv for pull database
1671 int ctdb_ctrl_pulldb_recv(
1672 struct ctdb_context *ctdb,
1673 TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state,
1674 TDB_DATA *outdata)
1676 int ret;
1677 int32_t res;
1679 ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1680 if ( (ret != 0) || (res != 0) ){
1681 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1682 return -1;
1685 return 0;
1689 pull all keys and records for a specific database on a node
1691 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode,
1692 uint32_t dbid, uint32_t lmaster,
1693 TALLOC_CTX *mem_ctx, struct timeval timeout,
1694 TDB_DATA *outdata)
1696 struct ctdb_client_control_state *state;
1698 state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1699 timeout);
1701 return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1706 change dmaster for all keys in the database to the new value
1708 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1709 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1711 int ret;
1712 TDB_DATA indata;
1713 int32_t res;
1715 indata.dsize = 2*sizeof(uint32_t);
1716 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1718 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1719 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1721 ret = ctdb_control(ctdb, destnode, 0,
1722 CTDB_CONTROL_SET_DMASTER, 0, indata,
1723 NULL, NULL, &res, &timeout, NULL);
1724 if (ret != 0 || res != 0) {
1725 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1726 return -1;
1729 return 0;
1733 ping a node, return number of clients connected
1735 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1737 int ret;
1738 int32_t res;
1740 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1741 tdb_null, NULL, NULL, &res, NULL, NULL);
1742 if (ret != 0) {
1743 return -1;
1745 return res;
1748 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb,
1749 struct timeval timeout,
1750 uint32_t destnode,
1751 uint32_t *runstate)
1753 TDB_DATA outdata;
1754 int32_t res;
1755 int ret;
1757 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1758 tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1759 if (ret != 0 || res != 0) {
1760 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1761 return ret != 0 ? ret : res;
1764 if (outdata.dsize != sizeof(uint32_t)) {
1765 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1766 talloc_free(outdata.dptr);
1767 return -1;
1770 if (runstate != NULL) {
1771 *runstate = *(uint32_t *)outdata.dptr;
1773 talloc_free(outdata.dptr);
1775 return 0;
1779 find the real path to a ltdb
1781 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1782 const char **path)
1784 int ret;
1785 int32_t res;
1786 TDB_DATA data;
1788 data.dptr = (uint8_t *)&dbid;
1789 data.dsize = sizeof(dbid);
1791 ret = ctdb_control(ctdb, destnode, 0,
1792 CTDB_CONTROL_GETDBPATH, 0, data,
1793 mem_ctx, &data, &res, &timeout, NULL);
1794 if (ret != 0 || res != 0) {
1795 return -1;
1798 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1799 if ((*path) == NULL) {
1800 return -1;
1803 talloc_free(data.dptr);
1805 return 0;
1809 find the name of a db
1811 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1812 const char **name)
1814 int ret;
1815 int32_t res;
1816 TDB_DATA data;
1818 data.dptr = (uint8_t *)&dbid;
1819 data.dsize = sizeof(dbid);
1821 ret = ctdb_control(ctdb, destnode, 0,
1822 CTDB_CONTROL_GET_DBNAME, 0, data,
1823 mem_ctx, &data, &res, &timeout, NULL);
1824 if (ret != 0 || res != 0) {
1825 return -1;
1828 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1829 if ((*name) == NULL) {
1830 return -1;
1833 talloc_free(data.dptr);
1835 return 0;
1839 get the health status of a db
1841 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1842 struct timeval timeout,
1843 uint32_t destnode,
1844 uint32_t dbid, TALLOC_CTX *mem_ctx,
1845 const char **reason)
1847 int ret;
1848 int32_t res;
1849 TDB_DATA data;
1851 data.dptr = (uint8_t *)&dbid;
1852 data.dsize = sizeof(dbid);
1854 ret = ctdb_control(ctdb, destnode, 0,
1855 CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1856 mem_ctx, &data, &res, &timeout, NULL);
1857 if (ret != 0 || res != 0) {
1858 return -1;
1861 if (data.dsize == 0) {
1862 (*reason) = NULL;
1863 return 0;
1866 (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1867 if ((*reason) == NULL) {
1868 return -1;
1871 talloc_free(data.dptr);
1873 return 0;
1877 * get db sequence number
1879 int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
1880 uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
1882 int ret;
1883 int32_t res;
1884 TDB_DATA data, outdata;
1885 uint8_t buf[sizeof(uint64_t)] = { 0 };
1887 *(uint32_t *)buf = dbid;
1888 data.dptr = buf;
1889 data.dsize = sizeof(uint64_t);
1891 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
1892 0, data, ctdb, &outdata, &res, &timeout, NULL);
1893 if (ret != 0 || res != 0) {
1894 DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
1895 return -1;
1898 if (outdata.dsize != sizeof(uint64_t)) {
1899 DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
1900 talloc_free(outdata.dptr);
1901 return -1;
1904 if (seqnum != NULL) {
1905 *seqnum = *(uint64_t *)outdata.dptr;
1907 talloc_free(outdata.dptr);
1909 return 0;
1913 create a database
1915 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout,
1916 uint32_t destnode, TALLOC_CTX *mem_ctx,
1917 const char *name, uint8_t db_flags, uint32_t *db_id)
1919 int ret;
1920 int32_t res;
1921 TDB_DATA data;
1922 uint32_t opcode;
1924 data.dptr = discard_const(name);
1925 data.dsize = strlen(name)+1;
1927 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
1928 opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
1929 } else if (db_flags & CTDB_DB_FLAGS_REPLICATED) {
1930 opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
1931 } else {
1932 opcode = CTDB_CONTROL_DB_ATTACH;
1935 ret = ctdb_control(ctdb, destnode, 0, opcode, 0, data,
1936 mem_ctx, &data, &res, &timeout, NULL);
1938 if (ret != 0 || res != 0) {
1939 return -1;
1942 if (data.dsize != sizeof(uint32_t)) {
1943 TALLOC_FREE(data.dptr);
1944 return -1;
1946 if (db_id != NULL) {
1947 *db_id = *(uint32_t *)data.dptr;
1949 talloc_free(data.dptr);
1951 return 0;
1955 get debug level on a node
1957 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1959 int ret;
1960 int32_t res;
1961 TDB_DATA data;
1963 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1964 ctdb, &data, &res, NULL, NULL);
1965 if (ret != 0 || res != 0) {
1966 return -1;
1968 if (data.dsize != sizeof(int32_t)) {
1969 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1970 (unsigned)data.dsize));
1971 return -1;
1973 *level = *(int32_t *)data.dptr;
1974 talloc_free(data.dptr);
1975 return 0;
1979 set debug level on a node
1981 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1983 int ret;
1984 int32_t res;
1985 TDB_DATA data;
1987 data.dptr = (uint8_t *)&level;
1988 data.dsize = sizeof(level);
1990 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1991 NULL, NULL, &res, NULL, NULL);
1992 if (ret != 0 || res != 0) {
1993 return -1;
1995 return 0;
2000 get a list of connected nodes
2002 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
2003 struct timeval timeout,
2004 TALLOC_CTX *mem_ctx,
2005 uint32_t *num_nodes)
2007 struct ctdb_node_map_old *map=NULL;
2008 int ret, i;
2009 uint32_t *nodes;
2011 *num_nodes = 0;
2013 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
2014 if (ret != 0) {
2015 return NULL;
2018 nodes = talloc_array(mem_ctx, uint32_t, map->num);
2019 if (nodes == NULL) {
2020 return NULL;
2023 for (i=0;i<map->num;i++) {
2024 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2025 nodes[*num_nodes] = map->nodes[i].pnn;
2026 (*num_nodes)++;
2030 return nodes;
2035 reset remote status
2037 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
2039 int ret;
2040 int32_t res;
2042 ret = ctdb_control(ctdb, destnode, 0,
2043 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
2044 NULL, NULL, &res, NULL, NULL);
2045 if (ret != 0 || res != 0) {
2046 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
2047 return -1;
2049 return 0;
2053 * Get db open flags
2055 int ctdb_ctrl_db_open_flags(struct ctdb_context *ctdb, uint32_t db_id,
2056 int *tdb_flags)
2058 TDB_DATA indata, outdata;
2059 int ret;
2060 int32_t res;
2062 indata.dptr = (uint8_t *)&db_id;
2063 indata.dsize = sizeof(db_id);
2065 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2066 CTDB_CONTROL_DB_OPEN_FLAGS, 0, indata,
2067 ctdb, &outdata, &res, NULL, NULL);
2068 if (ret != 0 || res != 0) {
2069 D_ERR("ctdb control for db open flags failed\n");
2070 return -1;
2073 if (outdata.dsize != sizeof(int32_t)) {
2074 D_ERR(__location__ " expected %zi bytes, received %zi bytes\n",
2075 sizeof(int32_t), outdata.dsize);
2076 talloc_free(outdata.dptr);
2077 return -1;
2080 *tdb_flags = *(int32_t *)outdata.dptr;
2081 talloc_free(outdata.dptr);
2082 return 0;
2086 attach to a specific database - client call
2088 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
2089 struct timeval timeout,
2090 const char *name,
2091 uint8_t db_flags)
2093 struct ctdb_db_context *ctdb_db;
2094 int ret;
2095 int tdb_flags;
2097 ctdb_db = ctdb_db_handle(ctdb, name);
2098 if (ctdb_db) {
2099 return ctdb_db;
2102 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
2103 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
2105 ctdb_db->ctdb = ctdb;
2106 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
2107 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
2109 /* tell ctdb daemon to attach */
2110 ret = ctdb_ctrl_createdb(ctdb, timeout, CTDB_CURRENT_NODE,
2111 ctdb_db, name, db_flags, &ctdb_db->db_id);
2112 if (ret != 0) {
2113 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
2114 talloc_free(ctdb_db);
2115 return NULL;
2118 ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
2119 if (ret != 0) {
2120 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
2121 talloc_free(ctdb_db);
2122 return NULL;
2125 ret = ctdb_ctrl_db_open_flags(ctdb, ctdb_db->db_id, &tdb_flags);
2126 if (ret != 0) {
2127 D_ERR("Failed to get tdb_flags for database '%s'\n", name);
2128 talloc_free(ctdb_db);
2129 return NULL;
2132 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
2133 O_RDWR, 0);
2134 if (ctdb_db->ltdb == NULL) {
2135 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
2136 talloc_free(ctdb_db);
2137 return NULL;
2140 ctdb_db->db_flags = db_flags;
2142 DLIST_ADD(ctdb->db_list, ctdb_db);
2144 /* add well known functions */
2145 ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
2146 ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
2147 ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
2149 return ctdb_db;
2153 * detach from a specific database - client call
2155 int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
2157 int ret;
2158 int32_t status;
2159 TDB_DATA data;
2161 data.dsize = sizeof(db_id);
2162 data.dptr = (uint8_t *)&db_id;
2164 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
2165 0, data, NULL, NULL, &status, NULL, NULL);
2166 if (ret != 0 || status != 0) {
2167 return -1;
2169 return 0;
2173 setup a call for a database
2175 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
2177 struct ctdb_registered_call *call;
2179 /* register locally */
2180 call = talloc(ctdb_db, struct ctdb_registered_call);
2181 call->fn = fn;
2182 call->id = id;
2184 DLIST_ADD(ctdb_db->calls, call);
2185 return 0;
2189 struct traverse_state {
2190 bool done;
2191 uint32_t count;
2192 ctdb_traverse_func fn;
2193 void *private_data;
2194 bool listemptyrecords;
2198 called on each key during a ctdb_traverse
2200 static void traverse_handler(uint64_t srvid, TDB_DATA data, void *p)
2202 struct traverse_state *state = (struct traverse_state *)p;
2203 struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
2204 TDB_DATA key;
2206 if (data.dsize < sizeof(uint32_t) || d->length != data.dsize) {
2207 DEBUG(DEBUG_ERR, ("Bad data size %u in traverse_handler\n",
2208 (unsigned)data.dsize));
2209 state->done = true;
2210 return;
2213 key.dsize = d->keylen;
2214 key.dptr = &d->data[0];
2215 data.dsize = d->datalen;
2216 data.dptr = &d->data[d->keylen];
2218 if (key.dsize == 0 && data.dsize == 0) {
2219 /* end of traverse */
2220 state->done = true;
2221 return;
2224 if (!state->listemptyrecords &&
2225 data.dsize == sizeof(struct ctdb_ltdb_header))
2227 /* empty records are deleted records in ctdb */
2228 return;
2231 if (state->fn(key, data, state->private_data) != 0) {
2232 state->done = true;
2235 state->count++;
2239 * start a cluster wide traverse, calling the supplied fn on each record
2240 * return the number of records traversed, or -1 on error
2242 * Extendet variant with a flag to signal whether empty records should
2243 * be listed.
2245 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2246 ctdb_traverse_func fn,
2247 bool withemptyrecords,
2248 void *private_data)
2250 TDB_DATA data;
2251 struct ctdb_traverse_start_ext t;
2252 int32_t status;
2253 int ret;
2254 uint64_t srvid = (getpid() | 0xFLL<<60);
2255 struct traverse_state state;
2257 state.done = false;
2258 state.count = 0;
2259 state.private_data = private_data;
2260 state.fn = fn;
2261 state.listemptyrecords = withemptyrecords;
2263 ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2264 if (ret != 0) {
2265 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2266 return -1;
2269 t.db_id = ctdb_db->db_id;
2270 t.srvid = srvid;
2271 t.reqid = 0;
2272 t.withemptyrecords = withemptyrecords;
2274 data.dptr = (uint8_t *)&t;
2275 data.dsize = sizeof(t);
2277 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2278 data, NULL, NULL, &status, NULL, NULL);
2279 if (ret != 0 || status != 0) {
2280 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2281 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2282 return -1;
2285 while (!state.done) {
2286 tevent_loop_once(ctdb_db->ctdb->ev);
2289 ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2290 if (ret != 0) {
2291 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2292 return -1;
2295 return state.count;
2299 * start a cluster wide traverse, calling the supplied fn on each record
2300 * return the number of records traversed, or -1 on error
2302 * Standard version which does not list the empty records:
2303 * These are considered deleted.
2305 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2307 return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2310 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2312 called on each key during a catdb
2314 int ctdb_dumpdb_record(TDB_DATA key, TDB_DATA data, void *p)
2316 int i;
2317 struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2318 FILE *f = c->f;
2319 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2321 fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2322 for (i=0;i<key.dsize;i++) {
2323 if (ISASCII(key.dptr[i])) {
2324 fprintf(f, "%c", key.dptr[i]);
2325 } else {
2326 fprintf(f, "\\%02X", key.dptr[i]);
2329 fprintf(f, "\"\n");
2331 fprintf(f, "dmaster: %u\n", h->dmaster);
2332 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2334 if (c->printlmaster && c->ctdb->vnn_map != NULL) {
2335 fprintf(f, "lmaster: %u\n", ctdb_lmaster(c->ctdb, &key));
2338 if (c->printhash) {
2339 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2342 if (c->printrecordflags) {
2343 fprintf(f, "flags: 0x%08x", h->flags);
2344 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2345 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2346 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2347 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2348 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2349 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2350 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2351 fprintf(f, "\n");
2354 if (c->printdatasize) {
2355 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2356 } else {
2357 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2358 for (i=sizeof(*h);i<data.dsize;i++) {
2359 if (ISASCII(data.dptr[i])) {
2360 fprintf(f, "%c", data.dptr[i]);
2361 } else {
2362 fprintf(f, "\\%02X", data.dptr[i]);
2365 fprintf(f, "\"\n");
2368 fprintf(f, "\n");
2370 return 0;
2374 convenience function to list all keys to stdout
2376 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2377 struct ctdb_dump_db_context *ctx)
2379 return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2380 ctx->printemptyrecords, ctx);
2384 get the pid of a ctdb daemon
2386 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2388 int ret;
2389 int32_t res;
2391 ret = ctdb_control(ctdb, destnode, 0,
2392 CTDB_CONTROL_GET_PID, 0, tdb_null,
2393 NULL, NULL, &res, &timeout, NULL);
2394 if (ret != 0) {
2395 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2396 return -1;
2399 *pid = res;
2401 return 0;
2404 /* Freeze all databases */
2405 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout,
2406 uint32_t destnode)
2408 int ret;
2409 int32_t res;
2411 ret = ctdb_control(ctdb, destnode, 0,
2412 CTDB_CONTROL_FREEZE, 0, tdb_null,
2413 NULL, NULL, &res, &timeout, NULL);
2414 if (ret != 0 || res != 0) {
2415 DEBUG(DEBUG_ERR, ("ctdb_ctrl_freeze_priority failed\n"));
2416 return -1;
2419 return 0;
2423 get pnn of a node, or -1
2425 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2427 int ret;
2428 int32_t res;
2430 ret = ctdb_control(ctdb, destnode, 0,
2431 CTDB_CONTROL_GET_PNN, 0, tdb_null,
2432 NULL, NULL, &res, &timeout, NULL);
2433 if (ret != 0) {
2434 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2435 return -1;
2438 return res;
2443 sent to a node to make it take over an ip address
2445 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2446 uint32_t destnode, struct ctdb_public_ip *ip)
2448 TDB_DATA data;
2449 int ret;
2450 int32_t res;
2452 data.dsize = sizeof(*ip);
2453 data.dptr = (uint8_t *)ip;
2455 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0,
2456 data, NULL, NULL, &res, &timeout, NULL);
2457 if (ret != 0 || res != 0) {
2458 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2459 return -1;
2462 return 0;
2467 sent to a node to make it release an ip address
2469 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2470 uint32_t destnode, struct ctdb_public_ip *ip)
2472 TDB_DATA data;
2473 int ret;
2474 int32_t res;
2476 data.dsize = sizeof(*ip);
2477 data.dptr = (uint8_t *)ip;
2479 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0,
2480 data, NULL, NULL, &res, &timeout, NULL);
2481 if (ret != 0 || res != 0) {
2482 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2483 return -1;
2486 return 0;
2491 get a tunable
2493 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
2494 struct timeval timeout,
2495 uint32_t destnode,
2496 const char *name, uint32_t *value)
2498 struct ctdb_control_get_tunable *t;
2499 TDB_DATA data, outdata;
2500 int32_t res;
2501 int ret;
2503 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2504 data.dptr = talloc_size(ctdb, data.dsize);
2505 CTDB_NO_MEMORY(ctdb, data.dptr);
2507 t = (struct ctdb_control_get_tunable *)data.dptr;
2508 t->length = strlen(name)+1;
2509 memcpy(t->name, name, t->length);
2511 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2512 &outdata, &res, &timeout, NULL);
2513 talloc_free(data.dptr);
2514 if (ret != 0 || res != 0) {
2515 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2516 return ret != 0 ? ret : res;
2519 if (outdata.dsize != sizeof(uint32_t)) {
2520 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2521 talloc_free(outdata.dptr);
2522 return -1;
2525 *value = *(uint32_t *)outdata.dptr;
2526 talloc_free(outdata.dptr);
2528 return 0;
2532 set a tunable
2534 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
2535 struct timeval timeout,
2536 uint32_t destnode,
2537 const char *name, uint32_t value)
2539 struct ctdb_tunable_old *t;
2540 TDB_DATA data;
2541 int32_t res;
2542 int ret;
2544 data.dsize = offsetof(struct ctdb_tunable_old, name) + strlen(name) + 1;
2545 data.dptr = talloc_size(ctdb, data.dsize);
2546 CTDB_NO_MEMORY(ctdb, data.dptr);
2548 t = (struct ctdb_tunable_old *)data.dptr;
2549 t->length = strlen(name)+1;
2550 memcpy(t->name, name, t->length);
2551 t->value = value;
2553 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2554 NULL, &res, &timeout, NULL);
2555 talloc_free(data.dptr);
2556 if ((ret != 0) || (res == -1)) {
2557 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2558 return -1;
2561 return res;
2565 list tunables
2567 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
2568 struct timeval timeout,
2569 uint32_t destnode,
2570 TALLOC_CTX *mem_ctx,
2571 const char ***list, uint32_t *count)
2573 TDB_DATA outdata;
2574 int32_t res;
2575 int ret;
2576 struct ctdb_control_list_tunable *t;
2577 char *p, *s, *ptr;
2579 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
2580 mem_ctx, &outdata, &res, &timeout, NULL);
2581 if (ret != 0 || res != 0) {
2582 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2583 return -1;
2586 t = (struct ctdb_control_list_tunable *)outdata.dptr;
2587 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2588 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2589 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2590 talloc_free(outdata.dptr);
2591 return -1;
2594 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2595 CTDB_NO_MEMORY(ctdb, p);
2597 talloc_free(outdata.dptr);
2599 (*list) = NULL;
2600 (*count) = 0;
2602 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2603 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2604 CTDB_NO_MEMORY(ctdb, *list);
2605 (*list)[*count] = talloc_strdup(*list, s);
2606 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2607 (*count)++;
2610 talloc_free(p);
2612 return 0;
2616 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2617 struct timeval timeout, uint32_t destnode,
2618 TALLOC_CTX *mem_ctx,
2619 uint32_t flags,
2620 struct ctdb_public_ip_list_old **ips)
2622 int ret;
2623 TDB_DATA outdata;
2624 int32_t res;
2626 ret = ctdb_control(ctdb, destnode, 0,
2627 CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2628 mem_ctx, &outdata, &res, &timeout, NULL);
2629 if (ret != 0 || res != 0) {
2630 DEBUG(DEBUG_ERR,(__location__
2631 " ctdb_control for getpublicips failed ret:%d res:%d\n",
2632 ret, res));
2633 return -1;
2636 *ips = (struct ctdb_public_ip_list_old *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2637 talloc_free(outdata.dptr);
2639 return 0;
2642 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2643 struct timeval timeout, uint32_t destnode,
2644 TALLOC_CTX *mem_ctx,
2645 struct ctdb_public_ip_list_old **ips)
2647 return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2648 destnode, mem_ctx,
2649 0, ips);
2652 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2653 struct timeval timeout, uint32_t destnode,
2654 TALLOC_CTX *mem_ctx,
2655 const ctdb_sock_addr *addr,
2656 struct ctdb_public_ip_info_old **_info)
2658 int ret;
2659 TDB_DATA indata;
2660 TDB_DATA outdata;
2661 int32_t res;
2662 struct ctdb_public_ip_info_old *info;
2663 uint32_t len;
2664 uint32_t i;
2666 indata.dptr = discard_const_p(uint8_t, addr);
2667 indata.dsize = sizeof(*addr);
2669 ret = ctdb_control(ctdb, destnode, 0,
2670 CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2671 mem_ctx, &outdata, &res, &timeout, NULL);
2672 if (ret != 0 || res != 0) {
2673 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2674 "failed ret:%d res:%d\n",
2675 ret, res));
2676 return -1;
2679 len = offsetof(struct ctdb_public_ip_info_old, ifaces);
2680 if (len > outdata.dsize) {
2681 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2682 "returned invalid data with size %u > %u\n",
2683 (unsigned int)outdata.dsize,
2684 (unsigned int)len));
2685 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2686 return -1;
2689 info = (struct ctdb_public_ip_info_old *)outdata.dptr;
2690 len += info->num*sizeof(struct ctdb_iface);
2692 if (len > outdata.dsize) {
2693 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2694 "returned invalid data with size %u > %u\n",
2695 (unsigned int)outdata.dsize,
2696 (unsigned int)len));
2697 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2698 return -1;
2701 /* make sure we null terminate the returned strings */
2702 for (i=0; i < info->num; i++) {
2703 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2706 *_info = (struct ctdb_public_ip_info_old *)talloc_memdup(mem_ctx,
2707 outdata.dptr,
2708 outdata.dsize);
2709 talloc_free(outdata.dptr);
2710 if (*_info == NULL) {
2711 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2712 "talloc_memdup size %u failed\n",
2713 (unsigned int)outdata.dsize));
2714 return -1;
2717 return 0;
2720 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2721 struct timeval timeout, uint32_t destnode,
2722 TALLOC_CTX *mem_ctx,
2723 struct ctdb_iface_list_old **_ifaces)
2725 int ret;
2726 TDB_DATA outdata;
2727 int32_t res;
2728 struct ctdb_iface_list_old *ifaces;
2729 uint32_t len;
2730 uint32_t i;
2732 ret = ctdb_control(ctdb, destnode, 0,
2733 CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2734 mem_ctx, &outdata, &res, &timeout, NULL);
2735 if (ret != 0 || res != 0) {
2736 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2737 "failed ret:%d res:%d\n",
2738 ret, res));
2739 return -1;
2742 len = offsetof(struct ctdb_iface_list_old, ifaces);
2743 if (len > outdata.dsize) {
2744 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2745 "returned invalid data with size %u > %u\n",
2746 (unsigned int)outdata.dsize,
2747 (unsigned int)len));
2748 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2749 return -1;
2752 ifaces = (struct ctdb_iface_list_old *)outdata.dptr;
2753 len += ifaces->num*sizeof(struct ctdb_iface);
2755 if (len > outdata.dsize) {
2756 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2757 "returned invalid data with size %u > %u\n",
2758 (unsigned int)outdata.dsize,
2759 (unsigned int)len));
2760 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2761 return -1;
2764 /* make sure we null terminate the returned strings */
2765 for (i=0; i < ifaces->num; i++) {
2766 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2769 *_ifaces = (struct ctdb_iface_list_old *)talloc_memdup(mem_ctx,
2770 outdata.dptr,
2771 outdata.dsize);
2772 talloc_free(outdata.dptr);
2773 if (*_ifaces == NULL) {
2774 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2775 "talloc_memdup size %u failed\n",
2776 (unsigned int)outdata.dsize));
2777 return -1;
2780 return 0;
2783 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2784 struct timeval timeout, uint32_t destnode,
2785 TALLOC_CTX *mem_ctx,
2786 const struct ctdb_iface *info)
2788 int ret;
2789 TDB_DATA indata;
2790 int32_t res;
2792 indata.dptr = discard_const_p(uint8_t, info);
2793 indata.dsize = sizeof(*info);
2795 ret = ctdb_control(ctdb, destnode, 0,
2796 CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2797 mem_ctx, NULL, &res, &timeout, NULL);
2798 if (ret != 0 || res != 0) {
2799 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2800 "failed ret:%d res:%d\n",
2801 ret, res));
2802 return -1;
2805 return 0;
2809 set/clear the permanent disabled bit on a remote node
2811 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2812 uint32_t set, uint32_t clear)
2814 int ret;
2815 TDB_DATA data;
2816 struct ctdb_node_map_old *nodemap=NULL;
2817 struct ctdb_node_flag_change c;
2818 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2819 uint32_t recmaster;
2820 uint32_t *nodes;
2823 /* find the recovery master */
2824 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2825 if (ret != 0) {
2826 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2827 talloc_free(tmp_ctx);
2828 return ret;
2832 /* read the node flags from the recmaster */
2833 ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2834 if (ret != 0) {
2835 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2836 talloc_free(tmp_ctx);
2837 return -1;
2839 if (destnode >= nodemap->num) {
2840 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2841 talloc_free(tmp_ctx);
2842 return -1;
2845 c.pnn = destnode;
2846 c.old_flags = nodemap->nodes[destnode].flags;
2847 c.new_flags = c.old_flags;
2848 c.new_flags |= set;
2849 c.new_flags &= ~clear;
2851 data.dsize = sizeof(c);
2852 data.dptr = (unsigned char *)&c;
2854 /* send the flags update to all connected nodes */
2855 nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2857 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2858 nodes, 0,
2859 timeout, false, data,
2860 NULL, NULL,
2861 NULL) != 0) {
2862 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2864 talloc_free(tmp_ctx);
2865 return -1;
2868 talloc_free(tmp_ctx);
2869 return 0;
2874 get all tunables
2876 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2877 struct timeval timeout,
2878 uint32_t destnode,
2879 struct ctdb_tunable_list *tunables)
2881 TDB_DATA outdata;
2882 int ret;
2883 int32_t res;
2885 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2886 &outdata, &res, &timeout, NULL);
2887 if (ret != 0 || res != 0) {
2888 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2889 return -1;
2892 if (outdata.dsize != sizeof(*tunables)) {
2893 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2894 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2895 return -1;
2898 *tunables = *(struct ctdb_tunable_list *)outdata.dptr;
2899 talloc_free(outdata.dptr);
2900 return 0;
2904 add a public address to a node
2906 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2907 struct timeval timeout, uint32_t destnode,
2908 struct ctdb_addr_info_old *pub)
2910 TDB_DATA data;
2911 int32_t res;
2912 int ret;
2914 data.dsize = offsetof(struct ctdb_addr_info_old, iface) + pub->len;
2915 data.dptr = (unsigned char *)pub;
2917 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2918 NULL, &res, &timeout, NULL);
2919 if (ret != 0 || res != 0) {
2920 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2921 return -1;
2924 return 0;
2928 delete a public address from a node
2930 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2931 struct timeval timeout, uint32_t destnode,
2932 struct ctdb_addr_info_old *pub)
2934 TDB_DATA data;
2935 int32_t res;
2936 int ret;
2938 data.dsize = offsetof(struct ctdb_addr_info_old, iface) + pub->len;
2939 data.dptr = (unsigned char *)pub;
2941 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2942 NULL, &res, &timeout, NULL);
2943 if (ret != 0 || res != 0) {
2944 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2945 return -1;
2948 return 0;
2952 send a gratious arp
2954 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2955 struct timeval timeout, uint32_t destnode,
2956 ctdb_sock_addr *addr, const char *ifname)
2958 TDB_DATA data;
2959 int32_t res;
2960 int ret, len;
2961 struct ctdb_addr_info_old *gratious_arp;
2962 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2965 len = strlen(ifname)+1;
2966 gratious_arp = talloc_size(tmp_ctx,
2967 offsetof(struct ctdb_addr_info_old, iface) + len);
2968 CTDB_NO_MEMORY(ctdb, gratious_arp);
2970 gratious_arp->addr = *addr;
2971 gratious_arp->len = len;
2972 memcpy(&gratious_arp->iface[0], ifname, len);
2975 data.dsize = offsetof(struct ctdb_addr_info_old, iface) + len;
2976 data.dptr = (unsigned char *)gratious_arp;
2978 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATUITOUS_ARP, 0, data, NULL,
2979 NULL, &res, &timeout, NULL);
2980 if (ret != 0 || res != 0) {
2981 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2982 talloc_free(tmp_ctx);
2983 return -1;
2986 talloc_free(tmp_ctx);
2987 return 0;
2991 get a list of all tcp tickles that a node knows about for a particular vnn
2993 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb,
2994 struct timeval timeout, uint32_t destnode,
2995 TALLOC_CTX *mem_ctx,
2996 ctdb_sock_addr *addr,
2997 struct ctdb_tickle_list_old **list)
2999 int ret;
3000 TDB_DATA data, outdata;
3001 int32_t status;
3003 data.dptr = (uint8_t*)addr;
3004 data.dsize = sizeof(ctdb_sock_addr);
3006 ret = ctdb_control(ctdb, destnode, 0,
3007 CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data,
3008 mem_ctx, &outdata, &status, NULL, NULL);
3009 if (ret != 0 || status != 0) {
3010 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3011 return -1;
3014 *list = (struct ctdb_tickle_list_old *)outdata.dptr;
3016 return status;
3020 initialise the ctdb daemon for client applications
3022 NOTE: In current code the daemon does not fork. This is for testing purposes only
3023 and to simplify the code.
3025 struct ctdb_context *ctdb_init(struct tevent_context *ev)
3027 int ret;
3028 struct ctdb_context *ctdb;
3030 ctdb = talloc_zero(ev, struct ctdb_context);
3031 if (ctdb == NULL) {
3032 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3033 return NULL;
3035 ctdb->ev = ev;
3036 /* Wrap early to exercise code. */
3037 ret = reqid_init(ctdb, INT_MAX-200, &ctdb->idr);
3038 if (ret != 0) {
3039 DEBUG(DEBUG_ERR, ("reqid_init failed (%s)\n", strerror(ret)));
3040 talloc_free(ctdb);
3041 return NULL;
3044 ret = srvid_init(ctdb, &ctdb->srv);
3045 if (ret != 0) {
3046 DEBUG(DEBUG_ERR, ("srvid_init failed (%s)\n", strerror(ret)));
3047 talloc_free(ctdb);
3048 return NULL;
3051 ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
3052 if (ret != 0) {
3053 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3054 talloc_free(ctdb);
3055 return NULL;
3058 ctdb->statistics.statistics_start_time = timeval_current();
3060 return ctdb;
3065 set some ctdb flags
3067 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3069 ctdb->flags |= flags;
3073 setup the local socket name
3075 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3077 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3078 CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3080 return 0;
3083 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3085 return ctdb->daemon.name;
3089 return the pnn of this node
3091 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3093 return ctdb->pnn;
3098 get the uptime of a remote node
3100 struct ctdb_client_control_state *
3101 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3103 return ctdb_control_send(ctdb, destnode, 0,
3104 CTDB_CONTROL_UPTIME, 0, tdb_null,
3105 mem_ctx, &timeout, NULL);
3108 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3110 int ret;
3111 int32_t res;
3112 TDB_DATA outdata;
3114 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3115 if (ret != 0 || res != 0) {
3116 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3117 return -1;
3120 *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3122 return 0;
3125 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3127 struct ctdb_client_control_state *state;
3129 state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3130 return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3134 send a control to execute the "recovered" event script on a node
3136 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3138 int ret;
3139 int32_t status;
3141 ret = ctdb_control(ctdb, destnode, 0,
3142 CTDB_CONTROL_END_RECOVERY, 0, tdb_null,
3143 NULL, NULL, &status, &timeout, NULL);
3144 if (ret != 0 || status != 0) {
3145 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3146 return -1;
3149 return 0;
3153 callback for the async helpers used when sending the same control
3154 to multiple nodes in parallell.
3156 static void async_callback(struct ctdb_client_control_state *state)
3158 struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3159 struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3160 int ret;
3161 TDB_DATA outdata;
3162 int32_t res = -1;
3163 uint32_t destnode = state->c->hdr.destnode;
3165 outdata.dsize = 0;
3166 outdata.dptr = NULL;
3168 /* one more node has responded with recmode data */
3169 data->count--;
3171 /* if we failed to push the db, then return an error and let
3172 the main loop try again.
3174 if (state->state != CTDB_CONTROL_DONE) {
3175 if ( !data->dont_log_errors) {
3176 DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3178 data->fail_count++;
3179 if (state->state == CTDB_CONTROL_TIMEOUT) {
3180 res = -ETIME;
3181 } else {
3182 res = -1;
3184 if (data->fail_callback) {
3185 data->fail_callback(ctdb, destnode, res, outdata,
3186 data->callback_data);
3188 return;
3191 state->async.fn = NULL;
3193 ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3194 if ((ret != 0) || (res != 0)) {
3195 if ( !data->dont_log_errors) {
3196 DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3198 data->fail_count++;
3199 if (data->fail_callback) {
3200 data->fail_callback(ctdb, destnode, res, outdata,
3201 data->callback_data);
3204 if ((ret == 0) && (data->callback != NULL)) {
3205 data->callback(ctdb, destnode, res, outdata,
3206 data->callback_data);
3211 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3213 /* set up the callback functions */
3214 state->async.fn = async_callback;
3215 state->async.private_data = data;
3217 /* one more control to wait for to complete */
3218 data->count++;
3222 /* wait for up to the maximum number of seconds allowed
3223 or until all nodes we expect a response from has replied
3225 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3227 while (data->count > 0) {
3228 tevent_loop_once(ctdb->ev);
3230 if (data->fail_count != 0) {
3231 if (!data->dont_log_errors) {
3232 DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
3233 data->fail_count));
3235 return -1;
3237 return 0;
3242 perform a simple control on the listed nodes
3243 The control cannot return data
3245 int ctdb_client_async_control(struct ctdb_context *ctdb,
3246 enum ctdb_controls opcode,
3247 uint32_t *nodes,
3248 uint64_t srvid,
3249 struct timeval timeout,
3250 bool dont_log_errors,
3251 TDB_DATA data,
3252 client_async_callback client_callback,
3253 client_async_callback fail_callback,
3254 void *callback_data)
3256 struct client_async_data *async_data;
3257 struct ctdb_client_control_state *state;
3258 int j, num_nodes;
3260 async_data = talloc_zero(ctdb, struct client_async_data);
3261 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3262 async_data->dont_log_errors = dont_log_errors;
3263 async_data->callback = client_callback;
3264 async_data->fail_callback = fail_callback;
3265 async_data->callback_data = callback_data;
3266 async_data->opcode = opcode;
3268 num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3270 /* loop over all nodes and send an async control to each of them */
3271 for (j=0; j<num_nodes; j++) {
3272 uint32_t pnn = nodes[j];
3274 state = ctdb_control_send(ctdb, pnn, srvid, opcode,
3275 0, data, async_data, &timeout, NULL);
3276 if (state == NULL) {
3277 DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3278 talloc_free(async_data);
3279 return -1;
3282 ctdb_client_async_add(async_data, state);
3285 if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3286 talloc_free(async_data);
3287 return -1;
3290 talloc_free(async_data);
3291 return 0;
3294 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3295 struct ctdb_vnn_map *vnn_map,
3296 TALLOC_CTX *mem_ctx,
3297 bool include_self)
3299 int i, j, num_nodes;
3300 uint32_t *nodes;
3302 for (i=num_nodes=0;i<vnn_map->size;i++) {
3303 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3304 continue;
3306 num_nodes++;
3309 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3310 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3312 for (i=j=0;i<vnn_map->size;i++) {
3313 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3314 continue;
3316 nodes[j++] = vnn_map->map[i];
3319 return nodes;
3322 /* Get list of nodes not including those with flags specified by mask.
3323 * If exclude_pnn is not -1 then exclude that pnn from the list.
3325 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3326 struct ctdb_node_map_old *node_map,
3327 TALLOC_CTX *mem_ctx,
3328 uint32_t mask,
3329 int exclude_pnn)
3331 int i, j, num_nodes;
3332 uint32_t *nodes;
3334 for (i=num_nodes=0;i<node_map->num;i++) {
3335 if (node_map->nodes[i].flags & mask) {
3336 continue;
3338 if (node_map->nodes[i].pnn == exclude_pnn) {
3339 continue;
3341 num_nodes++;
3344 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3345 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3347 for (i=j=0;i<node_map->num;i++) {
3348 if (node_map->nodes[i].flags & mask) {
3349 continue;
3351 if (node_map->nodes[i].pnn == exclude_pnn) {
3352 continue;
3354 nodes[j++] = node_map->nodes[i].pnn;
3357 return nodes;
3360 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3361 struct ctdb_node_map_old *node_map,
3362 TALLOC_CTX *mem_ctx,
3363 bool include_self)
3365 return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3366 include_self ? -1 : ctdb->pnn);
3369 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3370 struct ctdb_node_map_old *node_map,
3371 TALLOC_CTX *mem_ctx,
3372 bool include_self)
3374 return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3375 include_self ? -1 : ctdb->pnn);
3379 this is used to test if a pnn lock exists and if it exists will return
3380 the number of connections that pnn has reported or -1 if that recovery
3381 daemon is not running.
3384 ctdb_read_pnn_lock(int fd, int32_t pnn)
3386 struct flock lock;
3387 char c;
3389 lock.l_type = F_WRLCK;
3390 lock.l_whence = SEEK_SET;
3391 lock.l_start = pnn;
3392 lock.l_len = 1;
3393 lock.l_pid = 0;
3395 if (fcntl(fd, F_GETLK, &lock) != 0) {
3396 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3397 return -1;
3400 if (lock.l_type == F_UNLCK) {
3401 return -1;
3404 if (pread(fd, &c, 1, pnn) == -1) {
3405 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3406 return -1;
3409 return c;
3413 get capabilities of a remote node
3415 struct ctdb_client_control_state *
3416 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3418 return ctdb_control_send(ctdb, destnode, 0,
3419 CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
3420 mem_ctx, &timeout, NULL);
3423 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3425 int ret;
3426 int32_t res;
3427 TDB_DATA outdata;
3429 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3430 if ( (ret != 0) || (res != 0) ) {
3431 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3432 return -1;
3435 if (capabilities) {
3436 *capabilities = *((uint32_t *)outdata.dptr);
3439 return 0;
3442 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3444 struct ctdb_client_control_state *state;
3445 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3446 int ret;
3448 state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3449 ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3450 talloc_free(tmp_ctx);
3451 return ret;
3454 static void get_capabilities_callback(struct ctdb_context *ctdb,
3455 uint32_t node_pnn, int32_t res,
3456 TDB_DATA outdata, void *callback_data)
3458 struct ctdb_node_capabilities *caps =
3459 talloc_get_type(callback_data,
3460 struct ctdb_node_capabilities);
3462 if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
3463 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n", (unsigned)outdata.dsize, outdata.dptr));
3464 return;
3467 if (node_pnn >= talloc_array_length(caps)) {
3468 DEBUG(DEBUG_ERR,
3469 (__location__ " unexpected PNN %u\n", node_pnn));
3470 return;
3473 caps[node_pnn].retrieved = true;
3474 caps[node_pnn].capabilities = *((uint32_t *)outdata.dptr);
3477 struct ctdb_node_capabilities *
3478 ctdb_get_capabilities(struct ctdb_context *ctdb,
3479 TALLOC_CTX *mem_ctx,
3480 struct timeval timeout,
3481 struct ctdb_node_map_old *nodemap)
3483 uint32_t *nodes;
3484 uint32_t i, res;
3485 struct ctdb_node_capabilities *ret;
3487 nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3489 ret = talloc_array(mem_ctx, struct ctdb_node_capabilities,
3490 nodemap->num);
3491 CTDB_NO_MEMORY_NULL(ctdb, ret);
3492 /* Prepopulate the expected PNNs */
3493 for (i = 0; i < talloc_array_length(ret); i++) {
3494 ret[i].retrieved = false;
3497 res = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
3498 nodes, 0, timeout,
3499 false, tdb_null,
3500 get_capabilities_callback, NULL,
3501 ret);
3502 if (res != 0) {
3503 DEBUG(DEBUG_ERR,
3504 (__location__ " Failed to read node capabilities.\n"));
3505 TALLOC_FREE(ret);
3508 return ret;
3511 uint32_t *
3512 ctdb_get_node_capabilities(struct ctdb_node_capabilities *caps,
3513 uint32_t pnn)
3515 if (pnn < talloc_array_length(caps) && caps[pnn].retrieved) {
3516 return &caps[pnn].capabilities;
3519 return NULL;
3522 bool ctdb_node_has_capabilities(struct ctdb_node_capabilities *caps,
3523 uint32_t pnn,
3524 uint32_t capabilities_required)
3526 uint32_t *capp = ctdb_get_node_capabilities(caps, pnn);
3527 return (capp != NULL) &&
3528 ((*capp & capabilities_required) == capabilities_required);
3532 static struct ctdb_server_id server_id_fetch(struct ctdb_context *ctdb, uint32_t reqid)
3534 struct ctdb_server_id id;
3536 id.pid = getpid();
3537 id.task_id = reqid;
3538 id.vnn = ctdb_get_pnn(ctdb);
3539 id.unique_id = id.vnn;
3540 id.unique_id = (id.unique_id << 32) | reqid;
3542 return id;
3545 /* This is basically a copy from Samba's server_id.*. However, a
3546 * dependency chain stops us from using Samba's version, so use a
3547 * renamed copy until a better solution is found. */
3548 static bool ctdb_server_id_equal(struct ctdb_server_id *id1, struct ctdb_server_id *id2)
3550 if (id1->pid != id2->pid) {
3551 return false;
3554 if (id1->task_id != id2->task_id) {
3555 return false;
3558 if (id1->vnn != id2->vnn) {
3559 return false;
3562 if (id1->unique_id != id2->unique_id) {
3563 return false;
3566 return true;
3569 static bool server_id_exists(struct ctdb_context *ctdb,
3570 struct ctdb_server_id *id)
3572 int ret;
3574 ret = ctdb_ctrl_process_exists(ctdb, id->vnn, id->pid);
3575 if (ret == 0) {
3576 return true;
3579 return false;
3582 static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
3583 struct ctdb_g_lock_list **locks)
3585 struct ctdb_g_lock_list *recs;
3587 recs = talloc_zero(mem_ctx, struct ctdb_g_lock_list);
3588 if (recs == NULL) {
3589 return false;
3592 if (data.dsize == 0) {
3593 goto done;
3596 if (data.dsize % sizeof(struct ctdb_g_lock) != 0) {
3597 DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
3598 (unsigned long)data.dsize));
3599 talloc_free(recs);
3600 return false;
3603 recs->num = data.dsize / sizeof(struct ctdb_g_lock);
3604 recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
3605 if (recs->lock == NULL) {
3606 talloc_free(recs);
3607 return false;
3610 done:
3611 if (locks != NULL) {
3612 *locks = recs;
3615 return true;
3619 static bool g_lock_lock(TALLOC_CTX *mem_ctx,
3620 struct ctdb_db_context *ctdb_db,
3621 const char *keyname, uint32_t reqid)
3623 TDB_DATA key, data;
3624 struct ctdb_record_handle *h;
3625 struct ctdb_g_lock_list *locks;
3626 struct ctdb_server_id id;
3627 struct timeval t_start;
3628 int i;
3630 key.dptr = (uint8_t *)discard_const(keyname);
3631 key.dsize = strlen(keyname) + 1;
3633 t_start = timeval_current();
3635 again:
3636 /* Keep trying for an hour. */
3637 if (timeval_elapsed(&t_start) > 3600) {
3638 return false;
3641 h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3642 if (h == NULL) {
3643 return false;
3646 if (!g_lock_parse(h, data, &locks)) {
3647 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3648 talloc_free(data.dptr);
3649 talloc_free(h);
3650 return false;
3653 talloc_free(data.dptr);
3655 id = server_id_fetch(ctdb_db->ctdb, reqid);
3657 i = 0;
3658 while (i < locks->num) {
3659 if (ctdb_server_id_equal(&locks->lock[i].sid, &id)) {
3660 /* Internal error */
3661 talloc_free(h);
3662 return false;
3665 if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].sid)) {
3666 if (i < locks->num-1) {
3667 locks->lock[i] = locks->lock[locks->num-1];
3669 locks->num--;
3670 continue;
3673 /* This entry is locked. */
3674 DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
3675 "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3676 (unsigned long long)id.pid,
3677 id.task_id, id.vnn,
3678 (unsigned long long)id.unique_id));
3679 talloc_free(h);
3680 goto again;
3683 locks->lock = talloc_realloc(locks, locks->lock, struct ctdb_g_lock,
3684 locks->num+1);
3685 if (locks->lock == NULL) {
3686 talloc_free(h);
3687 return false;
3690 locks->lock[locks->num].type = CTDB_G_LOCK_WRITE;
3691 locks->lock[locks->num].sid = id;
3692 locks->num++;
3694 data.dptr = (uint8_t *)locks->lock;
3695 data.dsize = locks->num * sizeof(struct ctdb_g_lock);
3697 if (ctdb_record_store(h, data) != 0) {
3698 DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
3699 "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3700 (unsigned long long)id.pid,
3701 id.task_id, id.vnn,
3702 (unsigned long long)id.unique_id));
3703 talloc_free(h);
3704 return false;
3707 DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
3708 "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
3709 (unsigned long long)id.pid,
3710 id.task_id, id.vnn,
3711 (unsigned long long)id.unique_id));
3713 talloc_free(h);
3714 return true;
3717 static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
3718 struct ctdb_db_context *ctdb_db,
3719 const char *keyname, uint32_t reqid)
3721 TDB_DATA key, data;
3722 struct ctdb_record_handle *h;
3723 struct ctdb_g_lock_list *locks;
3724 struct ctdb_server_id id;
3725 int i;
3726 bool found = false;
3728 key.dptr = (uint8_t *)discard_const(keyname);
3729 key.dsize = strlen(keyname) + 1;
3730 h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
3731 if (h == NULL) {
3732 return false;
3735 if (!g_lock_parse(h, data, &locks)) {
3736 DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
3737 talloc_free(data.dptr);
3738 talloc_free(h);
3739 return false;
3742 talloc_free(data.dptr);
3744 id = server_id_fetch(ctdb_db->ctdb, reqid);
3746 for (i=0; i<locks->num; i++) {
3747 if (ctdb_server_id_equal(&locks->lock[i].sid, &id)) {
3748 if (i < locks->num-1) {
3749 locks->lock[i] = locks->lock[locks->num-1];
3751 locks->num--;
3752 found = true;
3753 break;
3757 if (!found) {
3758 DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
3759 talloc_free(h);
3760 return false;
3763 data.dptr = (uint8_t *)locks->lock;
3764 data.dsize = locks->num * sizeof(struct ctdb_g_lock);
3766 if (ctdb_record_store(h, data) != 0) {
3767 talloc_free(h);
3768 return false;
3771 talloc_free(h);
3772 return true;
3776 struct ctdb_transaction_handle {
3777 struct ctdb_db_context *ctdb_db;
3778 struct ctdb_db_context *g_lock_db;
3779 char *lock_name;
3780 uint32_t reqid;
3782 * we store reads and writes done under a transaction:
3783 * - one list stores both reads and writes (m_all)
3784 * - the other just writes (m_write)
3786 struct ctdb_marshall_buffer *m_all;
3787 struct ctdb_marshall_buffer *m_write;
3790 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3792 g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
3793 reqid_remove(h->ctdb_db->ctdb->idr, h->reqid);
3794 return 0;
3799 * start a transaction on a database
3801 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3802 TALLOC_CTX *mem_ctx)
3804 struct ctdb_transaction_handle *h;
3806 h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3807 if (h == NULL) {
3808 DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
3809 return NULL;
3812 h->ctdb_db = ctdb_db;
3813 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
3814 (unsigned int)ctdb_db->db_id);
3815 if (h->lock_name == NULL) {
3816 DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
3817 talloc_free(h);
3818 return NULL;
3821 h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
3822 "g_lock.tdb", 0);
3823 if (!h->g_lock_db) {
3824 DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
3825 talloc_free(h);
3826 return NULL;
3829 h->reqid = reqid_new(h->ctdb_db->ctdb->idr, h);
3831 if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
3832 DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
3833 talloc_free(h);
3834 return NULL;
3837 talloc_set_destructor(h, ctdb_transaction_destructor);
3838 return h;
3842 * fetch a record inside a transaction
3844 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3845 TALLOC_CTX *mem_ctx,
3846 TDB_DATA key, TDB_DATA *data)
3848 struct ctdb_ltdb_header header;
3849 int ret;
3851 ZERO_STRUCT(header);
3853 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3854 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3855 /* record doesn't exist yet */
3856 *data = tdb_null;
3857 ret = 0;
3860 if (ret != 0) {
3861 return ret;
3864 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3865 if (h->m_all == NULL) {
3866 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3867 return -1;
3870 return 0;
3874 * stores a record inside a transaction
3876 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3877 TDB_DATA key, TDB_DATA data)
3879 TALLOC_CTX *tmp_ctx = talloc_new(h);
3880 struct ctdb_ltdb_header header;
3881 TDB_DATA olddata;
3882 int ret;
3884 /* we need the header so we can update the RSN */
3885 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3886 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3887 /* the record doesn't exist - create one with us as dmaster.
3888 This is only safe because we are in a transaction and this
3889 is a persistent database */
3890 ZERO_STRUCT(header);
3891 } else if (ret != 0) {
3892 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3893 talloc_free(tmp_ctx);
3894 return ret;
3897 if (data.dsize == olddata.dsize &&
3898 memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
3899 header.rsn != 0) {
3900 /* save writing the same data */
3901 talloc_free(tmp_ctx);
3902 return 0;
3905 header.dmaster = h->ctdb_db->ctdb->pnn;
3906 header.rsn++;
3908 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3909 if (h->m_all == NULL) {
3910 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3911 talloc_free(tmp_ctx);
3912 return -1;
3915 h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3916 if (h->m_write == NULL) {
3917 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3918 talloc_free(tmp_ctx);
3919 return -1;
3922 talloc_free(tmp_ctx);
3923 return 0;
3926 static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
3928 const char *keyname = CTDB_DB_SEQNUM_KEY;
3929 TDB_DATA key, data;
3930 struct ctdb_ltdb_header header;
3931 int ret;
3933 key.dptr = (uint8_t *)discard_const(keyname);
3934 key.dsize = strlen(keyname) + 1;
3936 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
3937 if (ret != 0) {
3938 *seqnum = 0;
3939 return 0;
3942 if (data.dsize == 0) {
3943 *seqnum = 0;
3944 return 0;
3947 if (data.dsize != sizeof(*seqnum)) {
3948 DEBUG(DEBUG_ERR, (__location__ " Invalid data received len=%zi\n",
3949 data.dsize));
3950 talloc_free(data.dptr);
3951 return -1;
3954 *seqnum = *(uint64_t *)data.dptr;
3955 talloc_free(data.dptr);
3957 return 0;
3961 static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
3962 uint64_t seqnum)
3964 const char *keyname = CTDB_DB_SEQNUM_KEY;
3965 TDB_DATA key, data;
3967 key.dptr = (uint8_t *)discard_const(keyname);
3968 key.dsize = strlen(keyname) + 1;
3970 data.dptr = (uint8_t *)&seqnum;
3971 data.dsize = sizeof(seqnum);
3973 return ctdb_transaction_store(h, key, data);
3978 * commit a transaction
3980 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3982 int ret;
3983 uint64_t old_seqnum, new_seqnum;
3984 int32_t status;
3985 struct timeval timeout;
3987 if (h->m_write == NULL) {
3988 /* no changes were made */
3989 talloc_free(h);
3990 return 0;
3993 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
3994 if (ret != 0) {
3995 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
3996 ret = -1;
3997 goto done;
4000 new_seqnum = old_seqnum + 1;
4001 ret = ctdb_store_db_seqnum(h, new_seqnum);
4002 if (ret != 0) {
4003 DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
4004 ret = -1;
4005 goto done;
4008 again:
4009 timeout = timeval_current_ofs(30,0);
4010 ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
4011 h->ctdb_db->db_id,
4012 CTDB_CONTROL_TRANS3_COMMIT, 0,
4013 ctdb_marshall_finish(h->m_write), NULL, NULL,
4014 &status, &timeout, NULL);
4015 if (ret != 0 || status != 0) {
4017 * TRANS3_COMMIT control will only fail if recovery has been
4018 * triggered. Check if the database has been updated or not.
4020 ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
4021 if (ret != 0) {
4022 DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
4023 goto done;
4026 if (new_seqnum == old_seqnum) {
4027 /* Database not yet updated, try again */
4028 goto again;
4031 if (new_seqnum != (old_seqnum + 1)) {
4032 DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
4033 (long long unsigned)new_seqnum,
4034 (long long unsigned)old_seqnum));
4035 ret = -1;
4036 goto done;
4040 ret = 0;
4042 done:
4043 talloc_free(h);
4044 return ret;
4048 * cancel a transaction
4050 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
4052 talloc_free(h);
4053 return 0;
4058 recovery daemon ping to main daemon
4060 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4062 int ret;
4063 int32_t res;
4065 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
4066 ctdb, NULL, &res, NULL, NULL);
4067 if (ret != 0 || res != 0) {
4068 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4069 return -1;
4072 return 0;
4076 tell the main daemon how long it took to lock the reclock file
4078 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4080 int ret;
4081 int32_t res;
4082 TDB_DATA data;
4084 data.dptr = (uint8_t *)&latency;
4085 data.dsize = sizeof(latency);
4087 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
4088 ctdb, NULL, &res, NULL, NULL);
4089 if (ret != 0 || res != 0) {
4090 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4091 return -1;
4094 return 0;
4098 get the name of the reclock file
4100 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4101 uint32_t destnode, TALLOC_CTX *mem_ctx,
4102 const char **name)
4104 int ret;
4105 int32_t res;
4106 TDB_DATA data;
4108 ret = ctdb_control(ctdb, destnode, 0,
4109 CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null,
4110 mem_ctx, &data, &res, &timeout, NULL);
4111 if (ret != 0 || res != 0) {
4112 return -1;
4115 if (data.dsize == 0) {
4116 *name = NULL;
4117 } else {
4118 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4120 talloc_free(data.dptr);
4122 return 0;
4126 stop a node
4128 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4130 int ret;
4131 int32_t res;
4133 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null,
4134 ctdb, NULL, &res, &timeout, NULL);
4135 if (ret != 0 || res != 0) {
4136 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4137 return -1;
4140 return 0;
4144 continue a node
4146 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4148 int ret;
4150 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null,
4151 ctdb, NULL, NULL, &timeout, NULL);
4152 if (ret != 0) {
4153 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4154 return -1;
4157 return 0;
4161 set the lmaster role for a node
4163 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4165 int ret;
4166 TDB_DATA data;
4167 int32_t res;
4169 data.dsize = sizeof(lmasterrole);
4170 data.dptr = (uint8_t *)&lmasterrole;
4172 ret = ctdb_control(ctdb, destnode, 0,
4173 CTDB_CONTROL_SET_LMASTERROLE, 0, data,
4174 NULL, NULL, &res, &timeout, NULL);
4175 if (ret != 0 || res != 0) {
4176 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4177 return -1;
4180 return 0;
4184 set the recmaster role for a node
4186 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4188 int ret;
4189 TDB_DATA data;
4190 int32_t res;
4192 data.dsize = sizeof(recmasterrole);
4193 data.dptr = (uint8_t *)&recmasterrole;
4195 ret = ctdb_control(ctdb, destnode, 0,
4196 CTDB_CONTROL_SET_RECMASTERROLE, 0, data,
4197 NULL, NULL, &res, &timeout, NULL);
4198 if (ret != 0 || res != 0) {
4199 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4200 return -1;
4203 return 0;
4206 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout,
4207 uint32_t destnode, struct ctdb_ban_state *bantime)
4209 int ret;
4210 TDB_DATA data;
4211 int32_t res;
4213 data.dsize = sizeof(*bantime);
4214 data.dptr = (uint8_t *)bantime;
4216 ret = ctdb_control(ctdb, destnode, 0,
4217 CTDB_CONTROL_SET_BAN_STATE, 0, data,
4218 NULL, NULL, &res, &timeout, NULL);
4219 if (ret != 0 || res != 0) {
4220 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4221 return -1;
4224 return 0;
4228 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout,
4229 uint32_t destnode, TALLOC_CTX *mem_ctx,
4230 struct ctdb_ban_state **bantime)
4232 int ret;
4233 TDB_DATA outdata;
4234 int32_t res;
4235 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4237 ret = ctdb_control(ctdb, destnode, 0,
4238 CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4239 tmp_ctx, &outdata, &res, &timeout, NULL);
4240 if (ret != 0 || res != 0) {
4241 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4242 talloc_free(tmp_ctx);
4243 return -1;
4246 *bantime = (struct ctdb_ban_state *)talloc_steal(mem_ctx, outdata.dptr);
4247 talloc_free(tmp_ctx);
4249 return 0;
4252 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb,
4253 struct timeval timeout, uint32_t destnode,
4254 TALLOC_CTX *mem_ctx,
4255 struct ctdb_statistics_list_old **stats)
4257 int ret;
4258 TDB_DATA outdata;
4259 int32_t res;
4261 ret = ctdb_control(ctdb, destnode, 0,
4262 CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null,
4263 mem_ctx, &outdata, &res, &timeout, NULL);
4264 if (ret != 0 || res != 0 || outdata.dsize == 0) {
4265 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4266 return -1;
4269 *stats = (struct ctdb_statistics_list_old *)talloc_memdup(mem_ctx,
4270 outdata.dptr,
4271 outdata.dsize);
4272 talloc_free(outdata.dptr);
4274 return 0;
4277 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4279 if (h == NULL) {
4280 return NULL;
4283 return &h->header;
4287 struct ctdb_client_control_state *
4288 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4290 struct ctdb_client_control_state *handle;
4291 struct ctdb_marshall_buffer *m;
4292 struct ctdb_rec_data_old *rec;
4293 TDB_DATA outdata;
4295 m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4296 if (m == NULL) {
4297 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4298 return NULL;
4301 m->db_id = ctdb_db->db_id;
4303 rec = ctdb_marshall_record(m, 0, key, header, data);
4304 if (rec == NULL) {
4305 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4306 talloc_free(m);
4307 return NULL;
4309 m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4310 if (m == NULL) {
4311 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4312 talloc_free(m);
4313 return NULL;
4315 m->count++;
4316 memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4319 outdata.dptr = (uint8_t *)m;
4320 outdata.dsize = talloc_get_size(m);
4322 handle = ctdb_control_send(ctdb, destnode, 0,
4323 CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4324 mem_ctx, &timeout, NULL);
4325 talloc_free(m);
4326 return handle;
4329 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4331 int ret;
4332 int32_t res;
4334 ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4335 if ( (ret != 0) || (res != 0) ){
4336 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4337 return -1;
4340 return 0;
4344 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4346 struct ctdb_client_control_state *state;
4348 state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4349 return ctdb_ctrl_updaterecord_recv(ctdb, state);
4358 set a database to be readonly
4360 struct ctdb_client_control_state *
4361 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4363 TDB_DATA data;
4365 data.dptr = (uint8_t *)&dbid;
4366 data.dsize = sizeof(dbid);
4368 return ctdb_control_send(ctdb, destnode, 0,
4369 CTDB_CONTROL_SET_DB_READONLY, 0, data,
4370 ctdb, NULL, NULL);
4373 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4375 int ret;
4376 int32_t res;
4378 ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4379 if (ret != 0 || res != 0) {
4380 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed ret:%d res:%d\n", ret, res));
4381 return -1;
4384 return 0;
4387 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4389 struct ctdb_client_control_state *state;
4391 state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4392 return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4396 set a database to be sticky
4398 struct ctdb_client_control_state *
4399 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4401 TDB_DATA data;
4403 data.dptr = (uint8_t *)&dbid;
4404 data.dsize = sizeof(dbid);
4406 return ctdb_control_send(ctdb, destnode, 0,
4407 CTDB_CONTROL_SET_DB_STICKY, 0, data,
4408 ctdb, NULL, NULL);
4411 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4413 int ret;
4414 int32_t res;
4416 ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4417 if (ret != 0 || res != 0) {
4418 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed ret:%d res:%d\n", ret, res));
4419 return -1;
4422 return 0;
4425 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4427 struct ctdb_client_control_state *state;
4429 state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4430 return ctdb_ctrl_set_db_sticky_recv(ctdb, state);