auth/credentials: don't ignore "client use kerberos" and --use-kerberos for machine...
[Samba.git] / ctdb / client / client_db.c
blob0b06d6e5e520a55a81e24b451c86832f4639dd39
1 /*
2 CTDB client code
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 struct tdb_context *client_db_tdb(struct ctdb_db_context *db)
42 return db->ltdb->tdb;
45 static struct ctdb_db_context *client_db_handle(
46 struct ctdb_client_context *client,
47 const char *db_name)
49 struct ctdb_db_context *db;
51 for (db = client->db; db != NULL; db = db->next) {
52 if (strcmp(db_name, db->db_name) == 0) {
53 return db;
57 return NULL;
60 static bool ctdb_db_persistent(struct ctdb_db_context *db)
62 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
63 return true;
65 return false;
68 static bool ctdb_db_replicated(struct ctdb_db_context *db)
70 if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
71 return true;
73 return false;
76 static bool ctdb_db_volatile(struct ctdb_db_context *db)
78 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT ||
79 db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
80 return false;
82 return true;
85 struct ctdb_set_db_flags_state {
86 struct tevent_context *ev;
87 struct ctdb_client_context *client;
88 struct timeval timeout;
89 uint32_t db_id;
90 uint8_t db_flags;
91 bool readonly_done, sticky_done;
92 uint32_t *pnn_list;
93 int count;
96 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
97 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
98 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
100 static struct tevent_req *ctdb_set_db_flags_send(
101 TALLOC_CTX *mem_ctx,
102 struct tevent_context *ev,
103 struct ctdb_client_context *client,
104 uint32_t destnode, struct timeval timeout,
105 uint32_t db_id, uint8_t db_flags)
107 struct tevent_req *req, *subreq;
108 struct ctdb_set_db_flags_state *state;
109 struct ctdb_req_control request;
111 req = tevent_req_create(mem_ctx, &state,
112 struct ctdb_set_db_flags_state);
113 if (req == NULL) {
114 return NULL;
117 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
118 tevent_req_done(req);
119 return tevent_req_post(req, ev);
122 state->ev = ev;
123 state->client = client;
124 state->timeout = timeout;
125 state->db_id = db_id;
126 state->db_flags = db_flags;
128 ctdb_req_control_get_nodemap(&request);
129 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
130 &request);
131 if (tevent_req_nomem(subreq, req)) {
132 return tevent_req_post(req, ev);
134 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
136 return req;
139 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
141 struct tevent_req *req = tevent_req_callback_data(
142 subreq, struct tevent_req);
143 struct ctdb_set_db_flags_state *state = tevent_req_data(
144 req, struct ctdb_set_db_flags_state);
145 struct ctdb_req_control request;
146 struct ctdb_reply_control *reply;
147 struct ctdb_node_map *nodemap;
148 int ret;
149 bool status;
151 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
152 TALLOC_FREE(subreq);
153 if (! status) {
154 DEBUG(DEBUG_ERR,
155 ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
156 state->db_id, ret));
157 tevent_req_error(req, ret);
158 return;
161 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
162 talloc_free(reply);
163 if (ret != 0) {
164 DEBUG(DEBUG_ERR,
165 ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
166 state->db_id, ret));
167 tevent_req_error(req, ret);
168 return;
171 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
172 state, &state->pnn_list);
173 talloc_free(nodemap);
174 if (state->count <= 0) {
175 DEBUG(DEBUG_ERR,
176 ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
177 state->db_id, state->count));
178 tevent_req_error(req, ENOMEM);
179 return;
182 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
183 ctdb_req_control_set_db_readonly(&request, state->db_id);
184 subreq = ctdb_client_control_multi_send(
185 state, state->ev, state->client,
186 state->pnn_list, state->count,
187 state->timeout, &request);
188 if (tevent_req_nomem(subreq, req)) {
189 return;
191 tevent_req_set_callback(subreq,
192 ctdb_set_db_flags_readonly_done, req);
193 } else {
194 state->readonly_done = true;
197 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
198 ctdb_req_control_set_db_sticky(&request, state->db_id);
199 subreq = ctdb_client_control_multi_send(
200 state, state->ev, state->client,
201 state->pnn_list, state->count,
202 state->timeout, &request);
203 if (tevent_req_nomem(subreq, req)) {
204 return;
206 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
207 req);
208 } else {
209 state->sticky_done = true;
213 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
215 struct tevent_req *req = tevent_req_callback_data(
216 subreq, struct tevent_req);
217 struct ctdb_set_db_flags_state *state = tevent_req_data(
218 req, struct ctdb_set_db_flags_state);
219 int ret;
220 bool status;
222 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
223 NULL);
224 TALLOC_FREE(subreq);
225 if (! status) {
226 DEBUG(DEBUG_ERR,
227 ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
228 state->db_id, ret));
229 tevent_req_error(req, ret);
230 return;
233 state->readonly_done = true;
235 if (state->readonly_done && state->sticky_done) {
236 tevent_req_done(req);
240 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
242 struct tevent_req *req = tevent_req_callback_data(
243 subreq, struct tevent_req);
244 struct ctdb_set_db_flags_state *state = tevent_req_data(
245 req, struct ctdb_set_db_flags_state);
246 int ret;
247 bool status;
249 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
250 NULL);
251 TALLOC_FREE(subreq);
252 if (! status) {
253 DEBUG(DEBUG_ERR,
254 ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
255 state->db_id, ret));
256 tevent_req_error(req, ret);
257 return;
260 state->sticky_done = true;
262 if (state->readonly_done && state->sticky_done) {
263 tevent_req_done(req);
267 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
269 int err;
271 if (tevent_req_is_unix_error(req, &err)) {
272 if (perr != NULL) {
273 *perr = err;
275 return false;
277 return true;
280 struct ctdb_attach_state {
281 struct tevent_context *ev;
282 struct ctdb_client_context *client;
283 struct timeval timeout;
284 uint32_t destnode;
285 uint8_t db_flags;
286 struct ctdb_db_context *db;
289 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
290 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
291 static void ctdb_attach_health_done(struct tevent_req *subreq);
292 static void ctdb_attach_flags_done(struct tevent_req *subreq);
293 static void ctdb_attach_open_flags_done(struct tevent_req *subreq);
295 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
296 struct tevent_context *ev,
297 struct ctdb_client_context *client,
298 struct timeval timeout,
299 const char *db_name, uint8_t db_flags)
301 struct tevent_req *req, *subreq;
302 struct ctdb_attach_state *state;
303 struct ctdb_req_control request;
305 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
306 if (req == NULL) {
307 return NULL;
310 state->db = client_db_handle(client, db_name);
311 if (state->db != NULL) {
312 tevent_req_done(req);
313 return tevent_req_post(req, ev);
316 state->ev = ev;
317 state->client = client;
318 state->timeout = timeout;
319 state->destnode = ctdb_client_pnn(client);
320 state->db_flags = db_flags;
322 state->db = talloc_zero(client, struct ctdb_db_context);
323 if (tevent_req_nomem(state->db, req)) {
324 return tevent_req_post(req, ev);
327 state->db->db_name = talloc_strdup(state->db, db_name);
328 if (tevent_req_nomem(state->db, req)) {
329 return tevent_req_post(req, ev);
332 state->db->db_flags = db_flags;
334 if (ctdb_db_persistent(state->db)) {
335 ctdb_req_control_db_attach_persistent(&request,
336 state->db->db_name);
337 } else if (ctdb_db_replicated(state->db)) {
338 ctdb_req_control_db_attach_replicated(&request,
339 state->db->db_name);
340 } else {
341 ctdb_req_control_db_attach(&request, state->db->db_name);
344 subreq = ctdb_client_control_send(state, state->ev, state->client,
345 state->destnode, state->timeout,
346 &request);
347 if (tevent_req_nomem(subreq, req)) {
348 return tevent_req_post(req, ev);
350 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
352 return req;
355 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
357 struct tevent_req *req = tevent_req_callback_data(
358 subreq, struct tevent_req);
359 struct ctdb_attach_state *state = tevent_req_data(
360 req, struct ctdb_attach_state);
361 struct ctdb_req_control request;
362 struct ctdb_reply_control *reply;
363 bool status;
364 int ret;
366 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
367 TALLOC_FREE(subreq);
368 if (! status) {
369 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
370 state->db->db_name,
371 (ctdb_db_persistent(state->db)
372 ? "DB_ATTACH_PERSISTENT"
373 : (ctdb_db_replicated(state->db)
374 ? "DB_ATTACH_REPLICATED"
375 : "DB_ATTACH")),
376 ret));
377 tevent_req_error(req, ret);
378 return;
381 if (ctdb_db_persistent(state->db)) {
382 ret = ctdb_reply_control_db_attach_persistent(
383 reply, &state->db->db_id);
384 } else if (ctdb_db_replicated(state->db)) {
385 ret = ctdb_reply_control_db_attach_replicated(
386 reply, &state->db->db_id);
387 } else {
388 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
390 talloc_free(reply);
391 if (ret != 0) {
392 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
393 state->db->db_name, ret));
394 tevent_req_error(req, ret);
395 return;
398 ctdb_req_control_getdbpath(&request, state->db->db_id);
399 subreq = ctdb_client_control_send(state, state->ev, state->client,
400 state->destnode, state->timeout,
401 &request);
402 if (tevent_req_nomem(subreq, req)) {
403 return;
405 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
408 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
410 struct tevent_req *req = tevent_req_callback_data(
411 subreq, struct tevent_req);
412 struct ctdb_attach_state *state = tevent_req_data(
413 req, struct ctdb_attach_state);
414 struct ctdb_reply_control *reply;
415 struct ctdb_req_control request;
416 bool status;
417 int ret;
419 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
420 TALLOC_FREE(subreq);
421 if (! status) {
422 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
423 state->db->db_name, ret));
424 tevent_req_error(req, ret);
425 return;
428 ret = ctdb_reply_control_getdbpath(reply, state->db,
429 &state->db->db_path);
430 talloc_free(reply);
431 if (ret != 0) {
432 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
433 state->db->db_name, ret));
434 tevent_req_error(req, ret);
435 return;
438 ctdb_req_control_db_get_health(&request, state->db->db_id);
439 subreq = ctdb_client_control_send(state, state->ev, state->client,
440 state->destnode, state->timeout,
441 &request);
442 if (tevent_req_nomem(subreq, req)) {
443 return;
445 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
448 static void ctdb_attach_health_done(struct tevent_req *subreq)
450 struct tevent_req *req = tevent_req_callback_data(
451 subreq, struct tevent_req);
452 struct ctdb_attach_state *state = tevent_req_data(
453 req, struct ctdb_attach_state);
454 struct ctdb_reply_control *reply;
455 const char *reason;
456 bool status;
457 int ret;
459 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
460 TALLOC_FREE(subreq);
461 if (! status) {
462 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
463 state->db->db_name, ret));
464 tevent_req_error(req, ret);
465 return;
468 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
469 if (ret != 0) {
470 DEBUG(DEBUG_ERR,
471 ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
472 state->db->db_name, ret));
473 tevent_req_error(req, ret);
474 return;
477 if (reason != NULL) {
478 /* Database unhealthy, avoid attach */
479 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
480 state->db->db_name, reason));
481 tevent_req_error(req, EIO);
482 return;
485 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
486 state->destnode, state->timeout,
487 state->db->db_id, state->db_flags);
488 if (tevent_req_nomem(subreq, req)) {
489 return;
491 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
494 static void ctdb_attach_flags_done(struct tevent_req *subreq)
496 struct tevent_req *req = tevent_req_callback_data(
497 subreq, struct tevent_req);
498 struct ctdb_attach_state *state = tevent_req_data(
499 req, struct ctdb_attach_state);
500 struct ctdb_req_control request;
501 bool status;
502 int ret;
504 status = ctdb_set_db_flags_recv(subreq, &ret);
505 TALLOC_FREE(subreq);
506 if (! status) {
507 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
508 state->db->db_name, state->db_flags));
509 tevent_req_error(req, ret);
510 return;
513 ctdb_req_control_db_open_flags(&request, state->db->db_id);
514 subreq = ctdb_client_control_send(state, state->ev, state->client,
515 state->destnode, state->timeout,
516 &request);
517 if (tevent_req_nomem(subreq, req)) {
518 return;
520 tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req);
523 static void ctdb_attach_open_flags_done(struct tevent_req *subreq)
525 struct tevent_req *req = tevent_req_callback_data(
526 subreq, struct tevent_req);
527 struct ctdb_attach_state *state = tevent_req_data(
528 req, struct ctdb_attach_state);
529 struct ctdb_reply_control *reply;
530 bool status;
531 int ret, tdb_flags;
533 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
534 TALLOC_FREE(subreq);
535 if (! status) {
536 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
537 state->db->db_name, ret));
538 tevent_req_error(req, ret);
539 return;
542 ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags);
543 talloc_free(reply);
544 if (ret != 0) {
545 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed,"
546 " ret=%d\n", state->db->db_name, ret));
547 tevent_req_error(req, ret);
548 return;
551 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
552 tdb_flags, O_RDWR, 0);
553 if (tevent_req_nomem(state->db->ltdb, req)) {
554 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
555 state->db->db_name));
556 return;
558 DLIST_ADD(state->client->db, state->db);
560 tevent_req_done(req);
563 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
564 struct ctdb_db_context **out)
566 struct ctdb_attach_state *state = tevent_req_data(
567 req, struct ctdb_attach_state);
568 int err;
570 if (tevent_req_is_unix_error(req, &err)) {
571 if (perr != NULL) {
572 *perr = err;
574 return false;
577 if (out != NULL) {
578 *out = state->db;
580 return true;
583 int ctdb_attach(struct tevent_context *ev,
584 struct ctdb_client_context *client,
585 struct timeval timeout,
586 const char *db_name, uint8_t db_flags,
587 struct ctdb_db_context **out)
589 TALLOC_CTX *mem_ctx;
590 struct tevent_req *req;
591 bool status;
592 int ret;
594 mem_ctx = talloc_new(client);
595 if (mem_ctx == NULL) {
596 return ENOMEM;
599 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
600 db_name, db_flags);
601 if (req == NULL) {
602 talloc_free(mem_ctx);
603 return ENOMEM;
606 tevent_req_poll(req, ev);
608 status = ctdb_attach_recv(req, &ret, out);
609 if (! status) {
610 talloc_free(mem_ctx);
611 return ret;
615 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
616 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
617 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
620 talloc_free(mem_ctx);
621 return 0;
624 struct ctdb_detach_state {
625 struct ctdb_client_context *client;
626 struct tevent_context *ev;
627 struct timeval timeout;
628 uint32_t db_id;
629 const char *db_name;
632 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
633 static void ctdb_detach_done(struct tevent_req *subreq);
635 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
636 struct tevent_context *ev,
637 struct ctdb_client_context *client,
638 struct timeval timeout, uint32_t db_id)
640 struct tevent_req *req, *subreq;
641 struct ctdb_detach_state *state;
642 struct ctdb_req_control request;
644 req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
645 if (req == NULL) {
646 return NULL;
649 state->client = client;
650 state->ev = ev;
651 state->timeout = timeout;
652 state->db_id = db_id;
654 ctdb_req_control_get_dbname(&request, db_id);
655 subreq = ctdb_client_control_send(state, ev, client,
656 ctdb_client_pnn(client), timeout,
657 &request);
658 if (tevent_req_nomem(subreq, req)) {
659 return tevent_req_post(req, ev);
661 tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
663 return req;
666 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
668 struct tevent_req *req = tevent_req_callback_data(
669 subreq, struct tevent_req);
670 struct ctdb_detach_state *state = tevent_req_data(
671 req, struct ctdb_detach_state);
672 struct ctdb_reply_control *reply;
673 struct ctdb_req_control request;
674 int ret;
675 bool status;
677 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
678 TALLOC_FREE(subreq);
679 if (! status) {
680 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
681 state->db_id, ret));
682 tevent_req_error(req, ret);
683 return;
686 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
687 if (ret != 0) {
688 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
689 state->db_id, ret));
690 tevent_req_error(req, ret);
691 return;
694 ctdb_req_control_db_detach(&request, state->db_id);
695 subreq = ctdb_client_control_send(state, state->ev, state->client,
696 ctdb_client_pnn(state->client),
697 state->timeout, &request);
698 if (tevent_req_nomem(subreq, req)) {
699 return;
701 tevent_req_set_callback(subreq, ctdb_detach_done, req);
705 static void ctdb_detach_done(struct tevent_req *subreq)
707 struct tevent_req *req = tevent_req_callback_data(
708 subreq, struct tevent_req);
709 struct ctdb_detach_state *state = tevent_req_data(
710 req, struct ctdb_detach_state);
711 struct ctdb_reply_control *reply;
712 struct ctdb_db_context *db;
713 int ret;
714 bool status;
716 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
717 TALLOC_FREE(subreq);
718 if (! status) {
719 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
720 state->db_name, ret));
721 tevent_req_error(req, ret);
722 return;
725 ret = ctdb_reply_control_db_detach(reply);
726 if (ret != 0) {
727 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
728 state->db_name, ret));
729 tevent_req_error(req, ret);
730 return;
733 db = client_db_handle(state->client, state->db_name);
734 if (db != NULL) {
735 DLIST_REMOVE(state->client->db, db);
736 TALLOC_FREE(db);
739 tevent_req_done(req);
742 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
744 int ret;
746 if (tevent_req_is_unix_error(req, &ret)) {
747 if (perr != NULL) {
748 *perr = ret;
750 return false;
753 return true;
756 int ctdb_detach(struct tevent_context *ev,
757 struct ctdb_client_context *client,
758 struct timeval timeout, uint32_t db_id)
760 TALLOC_CTX *mem_ctx;
761 struct tevent_req *req;
762 int ret;
763 bool status;
765 mem_ctx = talloc_new(client);
766 if (mem_ctx == NULL) {
767 return ENOMEM;
770 req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
771 if (req == NULL) {
772 talloc_free(mem_ctx);
773 return ENOMEM;
776 tevent_req_poll(req, ev);
778 status = ctdb_detach_recv(req, &ret);
779 if (! status) {
780 talloc_free(mem_ctx);
781 return ret;
784 talloc_free(mem_ctx);
785 return 0;
788 uint32_t ctdb_db_id(struct ctdb_db_context *db)
790 return db->db_id;
793 struct ctdb_db_traverse_local_state {
794 ctdb_rec_parser_func_t parser;
795 void *private_data;
796 bool extract_header;
797 int error;
800 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
801 TDB_DATA key, TDB_DATA data,
802 void *private_data)
804 struct ctdb_db_traverse_local_state *state =
805 (struct ctdb_db_traverse_local_state *)private_data;
806 int ret;
808 if (state->extract_header) {
809 struct ctdb_ltdb_header header;
811 ret = ctdb_ltdb_header_extract(&data, &header);
812 if (ret != 0) {
813 state->error = ret;
814 return 1;
817 ret = state->parser(0, &header, key, data, state->private_data);
818 } else {
819 ret = state->parser(0, NULL, key, data, state->private_data);
822 if (ret != 0) {
823 state->error = ret;
824 return 1;
827 return 0;
830 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
831 bool extract_header,
832 ctdb_rec_parser_func_t parser, void *private_data)
834 struct ctdb_db_traverse_local_state state;
835 int ret;
837 state.parser = parser;
838 state.private_data = private_data;
839 state.extract_header = extract_header;
840 state.error = 0;
842 if (readonly) {
843 ret = tdb_traverse_read(client_db_tdb(db),
844 ctdb_db_traverse_local_handler,
845 &state);
846 } else {
847 ret = tdb_traverse(client_db_tdb(db),
848 ctdb_db_traverse_local_handler, &state);
851 if (ret == -1) {
852 return EIO;
855 return state.error;
858 struct ctdb_db_traverse_state {
859 struct tevent_context *ev;
860 struct ctdb_client_context *client;
861 struct ctdb_db_context *db;
862 uint32_t destnode;
863 uint64_t srvid;
864 struct timeval timeout;
865 ctdb_rec_parser_func_t parser;
866 void *private_data;
867 int result;
870 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
871 static void ctdb_db_traverse_started(struct tevent_req *subreq);
872 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
873 void *private_data);
874 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
875 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
877 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
878 struct tevent_context *ev,
879 struct ctdb_client_context *client,
880 struct ctdb_db_context *db,
881 uint32_t destnode,
882 struct timeval timeout,
883 ctdb_rec_parser_func_t parser,
884 void *private_data)
886 struct tevent_req *req, *subreq;
887 struct ctdb_db_traverse_state *state;
889 req = tevent_req_create(mem_ctx, &state,
890 struct ctdb_db_traverse_state);
891 if (req == NULL) {
892 return NULL;
895 state->ev = ev;
896 state->client = client;
897 state->db = db;
898 state->destnode = destnode;
899 state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
900 state->timeout = timeout;
901 state->parser = parser;
902 state->private_data = private_data;
904 subreq = ctdb_client_set_message_handler_send(state, ev, client,
905 state->srvid,
906 ctdb_db_traverse_handler,
907 req);
908 if (tevent_req_nomem(subreq, req)) {
909 return tevent_req_post(req, ev);
911 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
913 return req;
916 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
918 struct tevent_req *req = tevent_req_callback_data(
919 subreq, struct tevent_req);
920 struct ctdb_db_traverse_state *state = tevent_req_data(
921 req, struct ctdb_db_traverse_state);
922 struct ctdb_traverse_start_ext traverse;
923 struct ctdb_req_control request;
924 int ret = 0;
925 bool status;
927 status = ctdb_client_set_message_handler_recv(subreq, &ret);
928 TALLOC_FREE(subreq);
929 if (! status) {
930 tevent_req_error(req, ret);
931 return;
934 traverse = (struct ctdb_traverse_start_ext) {
935 .db_id = ctdb_db_id(state->db),
936 .reqid = 0,
937 .srvid = state->srvid,
938 .withemptyrecords = false,
941 ctdb_req_control_traverse_start_ext(&request, &traverse);
942 subreq = ctdb_client_control_send(state, state->ev, state->client,
943 state->destnode, state->timeout,
944 &request);
945 if (subreq == NULL) {
946 state->result = ENOMEM;
947 ctdb_db_traverse_remove_handler(req);
948 return;
950 tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
953 static void ctdb_db_traverse_started(struct tevent_req *subreq)
955 struct tevent_req *req = tevent_req_callback_data(
956 subreq, struct tevent_req);
957 struct ctdb_db_traverse_state *state = tevent_req_data(
958 req, struct ctdb_db_traverse_state);
959 struct ctdb_reply_control *reply;
960 int ret = 0;
961 bool status;
963 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
964 TALLOC_FREE(subreq);
965 if (! status) {
966 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
967 state->result = ret;
968 ctdb_db_traverse_remove_handler(req);
969 return;
972 ret = ctdb_reply_control_traverse_start_ext(reply);
973 talloc_free(reply);
974 if (ret != 0) {
975 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
976 ret));
977 state->result = ret;
978 ctdb_db_traverse_remove_handler(req);
979 return;
983 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
984 void *private_data)
986 struct tevent_req *req = talloc_get_type_abort(
987 private_data, struct tevent_req);
988 struct ctdb_db_traverse_state *state = tevent_req_data(
989 req, struct ctdb_db_traverse_state);
990 struct ctdb_rec_data *rec;
991 struct ctdb_ltdb_header header;
992 size_t np;
993 int ret;
995 ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np);
996 if (ret != 0) {
997 return;
1000 if (rec->key.dsize == 0 && rec->data.dsize == 0) {
1001 talloc_free(rec);
1002 ctdb_db_traverse_remove_handler(req);
1003 return;
1006 ret = ctdb_ltdb_header_extract(&rec->data, &header);
1007 if (ret != 0) {
1008 talloc_free(rec);
1009 return;
1012 if (rec->data.dsize == 0) {
1013 talloc_free(rec);
1014 return;
1017 ret = state->parser(rec->reqid, &header, rec->key, rec->data,
1018 state->private_data);
1019 talloc_free(rec);
1020 if (ret != 0) {
1021 state->result = ret;
1022 ctdb_db_traverse_remove_handler(req);
1026 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
1028 struct ctdb_db_traverse_state *state = tevent_req_data(
1029 req, struct ctdb_db_traverse_state);
1030 struct tevent_req *subreq;
1032 subreq = ctdb_client_remove_message_handler_send(state, state->ev,
1033 state->client,
1034 state->srvid, req);
1035 if (tevent_req_nomem(subreq, req)) {
1036 return;
1038 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1041 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1043 struct tevent_req *req = tevent_req_callback_data(
1044 subreq, struct tevent_req);
1045 struct ctdb_db_traverse_state *state = tevent_req_data(
1046 req, struct ctdb_db_traverse_state);
1047 int ret;
1048 bool status;
1050 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1051 TALLOC_FREE(subreq);
1052 if (! status) {
1053 tevent_req_error(req, ret);
1054 return;
1057 if (state->result != 0) {
1058 tevent_req_error(req, state->result);
1059 return;
1062 tevent_req_done(req);
1065 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1067 int ret;
1069 if (tevent_req_is_unix_error(req, &ret)) {
1070 if (perr != NULL) {
1071 *perr = ret;
1073 return false;
1076 return true;
1079 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1080 struct ctdb_client_context *client,
1081 struct ctdb_db_context *db,
1082 uint32_t destnode, struct timeval timeout,
1083 ctdb_rec_parser_func_t parser, void *private_data)
1085 struct tevent_req *req;
1086 int ret = 0;
1087 bool status;
1089 req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1090 timeout, parser, private_data);
1091 if (req == NULL) {
1092 return ENOMEM;
1095 tevent_req_poll(req, ev);
1097 status = ctdb_db_traverse_recv(req, &ret);
1098 if (! status) {
1099 return ret;
1102 return 0;
1105 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1106 struct ctdb_ltdb_header *header,
1107 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1109 TDB_DATA rec;
1110 size_t np;
1111 int ret;
1113 rec = tdb_fetch(client_db_tdb(db), key);
1114 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1115 /* No record present */
1116 if (rec.dptr != NULL) {
1117 free(rec.dptr);
1120 if (tdb_error(client_db_tdb(db)) != TDB_ERR_NOEXIST) {
1121 return EIO;
1124 *header = (struct ctdb_ltdb_header) {
1125 .dmaster = CTDB_UNKNOWN_PNN,
1128 if (data != NULL) {
1129 *data = tdb_null;
1131 return 0;
1134 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
1135 if (ret != 0) {
1136 return ret;
1139 ret = 0;
1140 if (data != NULL) {
1141 data->dsize = rec.dsize - np;
1142 data->dptr = talloc_memdup(mem_ctx, rec.dptr + np,
1143 data->dsize);
1144 if (data->dptr == NULL) {
1145 ret = ENOMEM;
1149 free(rec.dptr);
1150 return ret;
1154 * Fetch a record from volatile database
1156 * Steps:
1157 * 1. Get a lock on the hash chain
1158 * 2. If the record does not exist, migrate the record
1159 * 3. If readonly=true and delegations do not exist, migrate the record.
1160 * 4. If readonly=false and delegations exist, migrate the record.
1161 * 5. If the local node is not dmaster, migrate the record.
1162 * 6. Return record
1165 struct ctdb_fetch_lock_state {
1166 struct tevent_context *ev;
1167 struct ctdb_client_context *client;
1168 struct ctdb_record_handle *h;
1169 bool readonly;
1170 uint32_t pnn;
1173 static int ctdb_fetch_lock_check(struct tevent_req *req);
1174 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1175 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1177 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1178 struct tevent_context *ev,
1179 struct ctdb_client_context *client,
1180 struct ctdb_db_context *db,
1181 TDB_DATA key, bool readonly)
1183 struct ctdb_fetch_lock_state *state;
1184 struct tevent_req *req;
1185 int ret;
1187 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1188 if (req == NULL) {
1189 return NULL;
1192 state->ev = ev;
1193 state->client = client;
1195 state->h = talloc_zero(db, struct ctdb_record_handle);
1196 if (tevent_req_nomem(state->h, req)) {
1197 return tevent_req_post(req, ev);
1199 state->h->ev = ev;
1200 state->h->client = client;
1201 state->h->db = db;
1202 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1203 if (tevent_req_nomem(state->h->key.dptr, req)) {
1204 return tevent_req_post(req, ev);
1206 state->h->key.dsize = key.dsize;
1207 state->h->readonly = false;
1209 state->readonly = readonly;
1210 state->pnn = ctdb_client_pnn(client);
1212 /* Check that database is not persistent */
1213 if (! ctdb_db_volatile(db)) {
1214 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1215 db->db_name));
1216 tevent_req_error(req, EINVAL);
1217 return tevent_req_post(req, ev);
1220 ret = ctdb_fetch_lock_check(req);
1221 if (ret == 0) {
1222 tevent_req_done(req);
1223 return tevent_req_post(req, ev);
1225 if (ret != EAGAIN) {
1226 tevent_req_error(req, ret);
1227 return tevent_req_post(req, ev);
1229 return req;
1232 static int ctdb_fetch_lock_check(struct tevent_req *req)
1234 struct ctdb_fetch_lock_state *state = tevent_req_data(
1235 req, struct ctdb_fetch_lock_state);
1236 struct ctdb_record_handle *h = state->h;
1237 struct ctdb_ltdb_header header;
1238 TDB_DATA data = tdb_null;
1239 size_t np;
1240 int ret, err = 0;
1241 bool do_migrate = false;
1243 ret = tdb_chainlock(client_db_tdb(h->db), h->key);
1244 if (ret != 0) {
1245 DEBUG(DEBUG_ERR,
1246 ("fetch_lock: %s tdb_chainlock failed, %s\n",
1247 h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
1248 err = EIO;
1249 goto failed;
1252 data = tdb_fetch(client_db_tdb(h->db), h->key);
1253 if (data.dptr == NULL) {
1254 if (tdb_error(client_db_tdb(h->db)) == TDB_ERR_NOEXIST) {
1255 goto migrate;
1256 } else {
1257 err = EIO;
1258 goto failed;
1262 /* Got the record */
1263 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np);
1264 if (ret != 0) {
1265 err = ret;
1266 goto failed;
1269 if (! state->readonly) {
1270 /* Read/write access */
1271 if (header.dmaster == state->pnn &&
1272 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1273 goto migrate;
1276 if (header.dmaster != state->pnn) {
1277 goto migrate;
1279 } else {
1280 /* Readonly access */
1281 if (header.dmaster != state->pnn &&
1282 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1283 CTDB_REC_RO_HAVE_DELEGATIONS))) {
1284 goto migrate;
1288 /* We are the dmaster or readonly delegation */
1289 h->header = header;
1290 h->data = data;
1291 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1292 CTDB_REC_RO_HAVE_DELEGATIONS)) {
1293 h->readonly = true;
1295 return 0;
1297 migrate:
1298 do_migrate = true;
1299 err = EAGAIN;
1301 failed:
1302 if (data.dptr != NULL) {
1303 free(data.dptr);
1305 ret = tdb_chainunlock(client_db_tdb(h->db), h->key);
1306 if (ret != 0) {
1307 DEBUG(DEBUG_ERR,
1308 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1309 h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
1310 return EIO;
1313 if (do_migrate) {
1314 ctdb_fetch_lock_migrate(req);
1316 return err;
1319 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1321 struct ctdb_fetch_lock_state *state = tevent_req_data(
1322 req, struct ctdb_fetch_lock_state);
1323 struct ctdb_req_call request;
1324 struct tevent_req *subreq;
1326 ZERO_STRUCT(request);
1327 request.flags = CTDB_IMMEDIATE_MIGRATION;
1328 if (state->readonly) {
1329 request.flags |= CTDB_WANT_READONLY;
1331 request.db_id = state->h->db->db_id;
1332 request.callid = CTDB_NULL_FUNC;
1333 request.key = state->h->key;
1334 request.calldata = tdb_null;
1336 subreq = ctdb_client_call_send(state, state->ev, state->client,
1337 &request);
1338 if (tevent_req_nomem(subreq, req)) {
1339 return;
1342 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1345 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1347 struct tevent_req *req = tevent_req_callback_data(
1348 subreq, struct tevent_req);
1349 struct ctdb_fetch_lock_state *state = tevent_req_data(
1350 req, struct ctdb_fetch_lock_state);
1351 struct ctdb_reply_call *reply;
1352 int ret;
1353 bool status;
1355 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1356 TALLOC_FREE(subreq);
1357 if (! status) {
1358 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1359 state->h->db->db_name, ret));
1360 tevent_req_error(req, ret);
1361 return;
1364 if (reply->status != 0) {
1365 tevent_req_error(req, EIO);
1366 return;
1368 talloc_free(reply);
1370 ret = ctdb_fetch_lock_check(req);
1371 if (ret != 0) {
1372 if (ret != EAGAIN) {
1373 tevent_req_error(req, ret);
1375 return;
1378 tevent_req_done(req);
1381 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1383 int ret;
1385 ret = tdb_chainunlock(client_db_tdb(h->db), h->key);
1386 if (ret != 0) {
1387 DEBUG(DEBUG_ERR,
1388 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1389 h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
1391 free(h->data.dptr);
1392 return 0;
1395 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1396 struct ctdb_ltdb_header *header,
1397 TALLOC_CTX *mem_ctx,
1398 TDB_DATA *data, int *perr)
1400 struct ctdb_fetch_lock_state *state = tevent_req_data(
1401 req, struct ctdb_fetch_lock_state);
1402 struct ctdb_record_handle *h = state->h;
1403 int err;
1405 if (tevent_req_is_unix_error(req, &err)) {
1406 if (perr != NULL) {
1407 TALLOC_FREE(state->h);
1408 *perr = err;
1410 return NULL;
1413 if (header != NULL) {
1414 *header = h->header;
1416 if (data != NULL) {
1417 size_t offset;
1419 offset = ctdb_ltdb_header_len(&h->header);
1421 data->dsize = h->data.dsize - offset;
1422 if (data->dsize == 0) {
1423 data->dptr = NULL;
1424 } else {
1425 data->dptr = talloc_memdup(mem_ctx,
1426 h->data.dptr + offset,
1427 data->dsize);
1428 if (data->dptr == NULL) {
1429 TALLOC_FREE(state->h);
1430 if (perr != NULL) {
1431 *perr = ENOMEM;
1433 return NULL;
1438 talloc_set_destructor(h, ctdb_record_handle_destructor);
1439 return h;
1442 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1443 struct ctdb_client_context *client,
1444 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1445 struct ctdb_record_handle **out,
1446 struct ctdb_ltdb_header *header, TDB_DATA *data)
1448 struct tevent_req *req;
1449 struct ctdb_record_handle *h;
1450 int ret = 0;
1452 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1453 if (req == NULL) {
1454 return ENOMEM;
1457 tevent_req_poll(req, ev);
1459 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1460 if (h == NULL) {
1461 return ret;
1464 *out = h;
1465 return 0;
1468 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1470 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1471 TDB_DATA rec[2];
1472 size_t np;
1473 int ret;
1475 /* Cannot modify the record if it was obtained as a readonly copy */
1476 if (h->readonly) {
1477 return EINVAL;
1480 /* Check if the new data is same */
1481 if (h->data.dsize == data.dsize &&
1482 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1483 /* No need to do anything */
1484 return 0;
1487 ctdb_ltdb_header_push(&h->header, header, &np);
1489 rec[0].dsize = np;
1490 rec[0].dptr = header;
1492 rec[1].dsize = data.dsize;
1493 rec[1].dptr = data.dptr;
1495 ret = tdb_storev(client_db_tdb(h->db), h->key, rec, 2, TDB_REPLACE);
1496 if (ret != 0) {
1497 DEBUG(DEBUG_ERR,
1498 ("store_record: %s tdb_storev failed, %s\n",
1499 h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
1500 return EIO;
1503 return 0;
1506 struct ctdb_delete_record_state {
1507 struct ctdb_record_handle *h;
1510 static void ctdb_delete_record_done(struct tevent_req *subreq);
1512 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1513 struct tevent_context *ev,
1514 struct ctdb_record_handle *h)
1516 struct tevent_req *req, *subreq;
1517 struct ctdb_delete_record_state *state;
1518 struct ctdb_key_data key;
1519 struct ctdb_req_control request;
1520 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1521 TDB_DATA rec;
1522 size_t np;
1523 int ret;
1525 req = tevent_req_create(mem_ctx, &state,
1526 struct ctdb_delete_record_state);
1527 if (req == NULL) {
1528 return NULL;
1531 state->h = h;
1533 /* Cannot delete the record if it was obtained as a readonly copy */
1534 if (h->readonly) {
1535 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1536 h->db->db_name));
1537 tevent_req_error(req, EINVAL);
1538 return tevent_req_post(req, ev);
1541 ctdb_ltdb_header_push(&h->header, header, &np);
1543 rec.dsize = np;
1544 rec.dptr = header;
1546 ret = tdb_store(client_db_tdb(h->db), h->key, rec, TDB_REPLACE);
1547 if (ret != 0) {
1548 D_ERR("fetch_lock delete: %s tdb_store failed, %s\n",
1549 h->db->db_name,
1550 tdb_errorstr(client_db_tdb(h->db)));
1551 tevent_req_error(req, EIO);
1552 return tevent_req_post(req, ev);
1555 key.db_id = h->db->db_id;
1556 key.header = h->header;
1557 key.key = h->key;
1559 ctdb_req_control_schedule_for_deletion(&request, &key);
1560 subreq = ctdb_client_control_send(state, ev, h->client,
1561 ctdb_client_pnn(h->client),
1562 tevent_timeval_zero(),
1563 &request);
1564 if (tevent_req_nomem(subreq, req)) {
1565 return tevent_req_post(req, ev);
1567 tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1569 return req;
1572 static void ctdb_delete_record_done(struct tevent_req *subreq)
1574 struct tevent_req *req = tevent_req_callback_data(
1575 subreq, struct tevent_req);
1576 struct ctdb_delete_record_state *state = tevent_req_data(
1577 req, struct ctdb_delete_record_state);
1578 int ret;
1579 bool status;
1581 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1582 TALLOC_FREE(subreq);
1583 if (! status) {
1584 D_ERR("delete_record: %s SCHEDULE_FOR_DELETION failed, ret=%d\n",
1585 state->h->db->db_name,
1586 ret);
1587 tevent_req_error(req, ret);
1588 return;
1591 tevent_req_done(req);
1594 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1596 int err;
1598 if (tevent_req_is_unix_error(req, &err)) {
1599 if (perr != NULL) {
1600 *perr = err;
1602 return false;
1605 return true;
1609 int ctdb_delete_record(struct ctdb_record_handle *h)
1611 struct tevent_context *ev = h->ev;
1612 TALLOC_CTX *mem_ctx;
1613 struct tevent_req *req;
1614 int ret;
1615 bool status;
1617 mem_ctx = talloc_new(NULL);
1618 if (mem_ctx == NULL) {
1619 return ENOMEM;
1622 req = ctdb_delete_record_send(mem_ctx, ev, h);
1623 if (req == NULL) {
1624 talloc_free(mem_ctx);
1625 return ENOMEM;
1628 tevent_req_poll(req, ev);
1630 status = ctdb_delete_record_recv(req, &ret);
1631 talloc_free(mem_ctx);
1632 if (! status) {
1633 return ret;
1636 return 0;
1640 * Global lock functions
1643 struct ctdb_g_lock_lock_state {
1644 struct tevent_context *ev;
1645 struct ctdb_client_context *client;
1646 struct ctdb_db_context *db;
1647 TDB_DATA key;
1648 struct ctdb_server_id my_sid;
1649 enum ctdb_g_lock_type lock_type;
1650 struct ctdb_record_handle *h;
1651 /* state for verification of active locks */
1652 struct ctdb_g_lock_list *lock_list;
1653 unsigned int current;
1656 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1657 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1658 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1659 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1660 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1662 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1663 enum ctdb_g_lock_type l2)
1665 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1666 return false;
1668 return true;
1671 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1672 struct tevent_context *ev,
1673 struct ctdb_client_context *client,
1674 struct ctdb_db_context *db,
1675 const char *keyname,
1676 struct ctdb_server_id *sid,
1677 bool readonly)
1679 struct tevent_req *req, *subreq;
1680 struct ctdb_g_lock_lock_state *state;
1682 req = tevent_req_create(mem_ctx, &state,
1683 struct ctdb_g_lock_lock_state);
1684 if (req == NULL) {
1685 return NULL;
1688 state->ev = ev;
1689 state->client = client;
1690 state->db = db;
1691 state->key.dptr = discard_const(keyname);
1692 state->key.dsize = strlen(keyname) + 1;
1693 state->my_sid = *sid;
1694 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1696 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1697 false);
1698 if (tevent_req_nomem(subreq, req)) {
1699 return tevent_req_post(req, ev);
1701 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1703 return req;
1706 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1708 struct tevent_req *req = tevent_req_callback_data(
1709 subreq, struct tevent_req);
1710 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1711 req, struct ctdb_g_lock_lock_state);
1712 TDB_DATA data;
1713 size_t np;
1714 int ret = 0;
1716 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1717 TALLOC_FREE(subreq);
1718 if (state->h == NULL) {
1719 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1720 (char *)state->key.dptr));
1721 tevent_req_error(req, ret);
1722 return;
1725 if (state->lock_list != NULL) {
1726 TALLOC_FREE(state->lock_list);
1727 state->current = 0;
1730 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1731 &state->lock_list, &np);
1732 talloc_free(data.dptr);
1733 if (ret != 0) {
1734 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1735 (char *)state->key.dptr));
1736 tevent_req_error(req, ret);
1737 return;
1740 ctdb_g_lock_lock_process_locks(req);
1743 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1745 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1746 req, struct ctdb_g_lock_lock_state);
1747 struct tevent_req *subreq;
1748 struct ctdb_g_lock *lock;
1749 bool check_server = false;
1750 int ret;
1752 while (state->current < state->lock_list->num) {
1753 lock = &state->lock_list->lock[state->current];
1755 /* We should not ask for the same lock more than once */
1756 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1757 DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1758 (char *)state->key.dptr));
1759 tevent_req_error(req, EDEADLK);
1760 return;
1763 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1764 check_server = true;
1765 break;
1768 state->current += 1;
1771 if (check_server) {
1772 struct ctdb_req_control request;
1774 ctdb_req_control_process_exists(&request, lock->sid.pid);
1775 subreq = ctdb_client_control_send(state, state->ev,
1776 state->client,
1777 lock->sid.vnn,
1778 tevent_timeval_zero(),
1779 &request);
1780 if (tevent_req_nomem(subreq, req)) {
1781 return;
1783 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1784 return;
1787 /* There is no conflict, add ourself to the lock_list */
1788 state->lock_list->lock = talloc_realloc(state->lock_list,
1789 state->lock_list->lock,
1790 struct ctdb_g_lock,
1791 state->lock_list->num + 1);
1792 if (state->lock_list->lock == NULL) {
1793 tevent_req_error(req, ENOMEM);
1794 return;
1797 lock = &state->lock_list->lock[state->lock_list->num];
1798 lock->type = state->lock_type;
1799 lock->sid = state->my_sid;
1800 state->lock_list->num += 1;
1802 ret = ctdb_g_lock_lock_update(req);
1803 if (ret != 0) {
1804 tevent_req_error(req, ret);
1805 return;
1808 TALLOC_FREE(state->h);
1809 tevent_req_done(req);
1812 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1814 struct tevent_req *req = tevent_req_callback_data(
1815 subreq, struct tevent_req);
1816 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1817 req, struct ctdb_g_lock_lock_state);
1818 struct ctdb_reply_control *reply;
1819 int ret, value;
1820 bool status;
1822 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1823 TALLOC_FREE(subreq);
1824 if (! status) {
1825 DEBUG(DEBUG_ERR,
1826 ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1827 (char *)state->key.dptr, ret));
1828 tevent_req_error(req, ret);
1829 return;
1832 ret = ctdb_reply_control_process_exists(reply, &value);
1833 if (ret != 0) {
1834 tevent_req_error(req, ret);
1835 return;
1837 talloc_free(reply);
1839 if (value == 0) {
1840 /* server process exists, need to retry */
1841 TALLOC_FREE(state->h);
1842 subreq = tevent_wakeup_send(state, state->ev,
1843 tevent_timeval_current_ofs(0,1000));
1844 if (tevent_req_nomem(subreq, req)) {
1845 return;
1847 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1848 return;
1851 /* server process does not exist, remove conflicting entry */
1852 state->lock_list->lock[state->current] =
1853 state->lock_list->lock[state->lock_list->num-1];
1854 state->lock_list->num -= 1;
1856 ret = ctdb_g_lock_lock_update(req);
1857 if (ret != 0) {
1858 tevent_req_error(req, ret);
1859 return;
1862 ctdb_g_lock_lock_process_locks(req);
1865 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1867 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1868 req, struct ctdb_g_lock_lock_state);
1869 TDB_DATA data;
1870 size_t np;
1871 int ret;
1873 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1874 data.dptr = talloc_size(state, data.dsize);
1875 if (data.dptr == NULL) {
1876 return ENOMEM;
1879 ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
1880 ret = ctdb_store_record(state->h, data);
1881 talloc_free(data.dptr);
1882 return ret;
1885 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1887 struct tevent_req *req = tevent_req_callback_data(
1888 subreq, struct tevent_req);
1889 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1890 req, struct ctdb_g_lock_lock_state);
1891 bool success;
1893 success = tevent_wakeup_recv(subreq);
1894 TALLOC_FREE(subreq);
1895 if (! success) {
1896 tevent_req_error(req, ENOMEM);
1897 return;
1900 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1901 state->db, state->key, false);
1902 if (tevent_req_nomem(subreq, req)) {
1903 return;
1905 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1908 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1910 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1911 req, struct ctdb_g_lock_lock_state);
1912 int err;
1914 TALLOC_FREE(state->h);
1916 if (tevent_req_is_unix_error(req, &err)) {
1917 if (perr != NULL) {
1918 *perr = err;
1920 return false;
1923 return true;
1926 struct ctdb_g_lock_unlock_state {
1927 struct tevent_context *ev;
1928 struct ctdb_client_context *client;
1929 struct ctdb_db_context *db;
1930 TDB_DATA key;
1931 struct ctdb_server_id my_sid;
1932 struct ctdb_record_handle *h;
1933 struct ctdb_g_lock_list *lock_list;
1936 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1937 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1938 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1940 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1941 struct tevent_context *ev,
1942 struct ctdb_client_context *client,
1943 struct ctdb_db_context *db,
1944 const char *keyname,
1945 struct ctdb_server_id sid)
1947 struct tevent_req *req, *subreq;
1948 struct ctdb_g_lock_unlock_state *state;
1950 req = tevent_req_create(mem_ctx, &state,
1951 struct ctdb_g_lock_unlock_state);
1952 if (req == NULL) {
1953 return NULL;
1956 state->ev = ev;
1957 state->client = client;
1958 state->db = db;
1959 state->key.dptr = discard_const(keyname);
1960 state->key.dsize = strlen(keyname) + 1;
1961 state->my_sid = sid;
1963 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1964 false);
1965 if (tevent_req_nomem(subreq, req)) {
1966 return tevent_req_post(req, ev);
1968 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1970 return req;
1973 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1975 struct tevent_req *req = tevent_req_callback_data(
1976 subreq, struct tevent_req);
1977 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1978 req, struct ctdb_g_lock_unlock_state);
1979 TDB_DATA data;
1980 size_t np;
1981 int ret = 0;
1983 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1984 TALLOC_FREE(subreq);
1985 if (state->h == NULL) {
1986 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1987 (char *)state->key.dptr));
1988 tevent_req_error(req, ret);
1989 return;
1992 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1993 &state->lock_list, &np);
1994 if (ret != 0) {
1995 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1996 (char *)state->key.dptr));
1997 tevent_req_error(req, ret);
1998 return;
2001 ret = ctdb_g_lock_unlock_update(req);
2002 if (ret != 0) {
2003 tevent_req_error(req, ret);
2004 return;
2007 if (state->lock_list->num == 0) {
2008 subreq = ctdb_delete_record_send(state, state->ev, state->h);
2009 if (tevent_req_nomem(subreq, req)) {
2010 return;
2012 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
2013 req);
2014 return;
2017 TALLOC_FREE(state->h);
2018 tevent_req_done(req);
2021 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
2023 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2024 req, struct ctdb_g_lock_unlock_state);
2025 struct ctdb_g_lock *lock;
2026 unsigned int i;
2027 int ret;
2029 for (i=0; i<state->lock_list->num; i++) {
2030 lock = &state->lock_list->lock[i];
2032 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
2033 break;
2037 if (i < state->lock_list->num) {
2038 state->lock_list->lock[i] =
2039 state->lock_list->lock[state->lock_list->num-1];
2040 state->lock_list->num -= 1;
2043 if (state->lock_list->num != 0) {
2044 TDB_DATA data;
2045 size_t np;
2047 data.dsize = ctdb_g_lock_list_len(state->lock_list);
2048 data.dptr = talloc_size(state, data.dsize);
2049 if (data.dptr == NULL) {
2050 return ENOMEM;
2053 ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
2054 ret = ctdb_store_record(state->h, data);
2055 talloc_free(data.dptr);
2056 if (ret != 0) {
2057 return ret;
2061 return 0;
2064 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2066 struct tevent_req *req = tevent_req_callback_data(
2067 subreq, struct tevent_req);
2068 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2069 req, struct ctdb_g_lock_unlock_state);
2070 int ret;
2071 bool status;
2073 status = ctdb_delete_record_recv(subreq, &ret);
2074 if (! status) {
2075 DEBUG(DEBUG_ERR,
2076 ("g_lock_unlock %s delete record failed, ret=%d\n",
2077 (char *)state->key.dptr, ret));
2078 tevent_req_error(req, ret);
2079 return;
2082 TALLOC_FREE(state->h);
2083 tevent_req_done(req);
2086 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2088 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2089 req, struct ctdb_g_lock_unlock_state);
2090 int err;
2092 TALLOC_FREE(state->h);
2094 if (tevent_req_is_unix_error(req, &err)) {
2095 if (perr != NULL) {
2096 *perr = err;
2098 return false;
2101 return true;
2105 * Persistent database functions
2107 struct ctdb_transaction_start_state {
2108 struct tevent_context *ev;
2109 struct ctdb_client_context *client;
2110 struct timeval timeout;
2111 struct ctdb_transaction_handle *h;
2112 uint32_t destnode;
2115 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2116 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2118 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2119 struct tevent_context *ev,
2120 struct ctdb_client_context *client,
2121 struct timeval timeout,
2122 struct ctdb_db_context *db,
2123 bool readonly)
2125 struct ctdb_transaction_start_state *state;
2126 struct tevent_req *req, *subreq;
2127 struct ctdb_transaction_handle *h;
2129 req = tevent_req_create(mem_ctx, &state,
2130 struct ctdb_transaction_start_state);
2131 if (req == NULL) {
2132 return NULL;
2135 if (ctdb_db_volatile(db)) {
2136 tevent_req_error(req, EINVAL);
2137 return tevent_req_post(req, ev);
2140 state->ev = ev;
2141 state->client = client;
2142 state->destnode = ctdb_client_pnn(client);
2144 h = talloc_zero(db, struct ctdb_transaction_handle);
2145 if (tevent_req_nomem(h, req)) {
2146 return tevent_req_post(req, ev);
2149 h->ev = ev;
2150 h->client = client;
2151 h->db = db;
2152 h->readonly = readonly;
2153 h->updated = false;
2155 /* SRVID is unique for databases, so client can have transactions
2156 * active for multiple databases */
2157 h->sid = ctdb_client_get_server_id(client, db->db_id);
2159 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2160 if (tevent_req_nomem(h->recbuf, req)) {
2161 return tevent_req_post(req, ev);
2164 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2165 if (tevent_req_nomem(h->lock_name, req)) {
2166 return tevent_req_post(req, ev);
2169 state->h = h;
2171 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2172 if (tevent_req_nomem(subreq, req)) {
2173 return tevent_req_post(req, ev);
2175 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2177 return req;
2180 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2182 struct tevent_req *req = tevent_req_callback_data(
2183 subreq, struct tevent_req);
2184 struct ctdb_transaction_start_state *state = tevent_req_data(
2185 req, struct ctdb_transaction_start_state);
2186 bool status;
2187 int ret;
2189 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2190 TALLOC_FREE(subreq);
2191 if (! status) {
2192 DEBUG(DEBUG_ERR,
2193 ("transaction_start: %s attach g_lock.tdb failed\n",
2194 state->h->db->db_name));
2195 tevent_req_error(req, ret);
2196 return;
2199 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2200 state->h->db_g_lock,
2201 state->h->lock_name,
2202 &state->h->sid, state->h->readonly);
2203 if (tevent_req_nomem(subreq, req)) {
2204 return;
2206 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2209 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2211 struct tevent_req *req = tevent_req_callback_data(
2212 subreq, struct tevent_req);
2213 struct ctdb_transaction_start_state *state = tevent_req_data(
2214 req, struct ctdb_transaction_start_state);
2215 int ret;
2216 bool status;
2218 status = ctdb_g_lock_lock_recv(subreq, &ret);
2219 TALLOC_FREE(subreq);
2220 if (! status) {
2221 DEBUG(DEBUG_ERR,
2222 ("transaction_start: %s g_lock lock failed, ret=%d\n",
2223 state->h->db->db_name, ret));
2224 tevent_req_error(req, ret);
2225 return;
2228 tevent_req_done(req);
2231 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2232 struct tevent_req *req,
2233 int *perr)
2235 struct ctdb_transaction_start_state *state = tevent_req_data(
2236 req, struct ctdb_transaction_start_state);
2237 int err;
2239 if (tevent_req_is_unix_error(req, &err)) {
2240 if (perr != NULL) {
2241 *perr = err;
2243 return NULL;
2246 return state->h;
2249 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2250 struct ctdb_client_context *client,
2251 struct timeval timeout,
2252 struct ctdb_db_context *db, bool readonly,
2253 struct ctdb_transaction_handle **out)
2255 struct tevent_req *req;
2256 struct ctdb_transaction_handle *h;
2257 int ret = 0;
2259 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2260 readonly);
2261 if (req == NULL) {
2262 return ENOMEM;
2265 tevent_req_poll(req, ev);
2267 h = ctdb_transaction_start_recv(req, &ret);
2268 if (h == NULL) {
2269 return ret;
2272 *out = h;
2273 return 0;
2276 struct ctdb_transaction_record_fetch_state {
2277 TDB_DATA key, data;
2278 struct ctdb_ltdb_header header;
2279 bool found;
2282 static int ctdb_transaction_record_fetch_traverse(
2283 uint32_t reqid,
2284 struct ctdb_ltdb_header *nullheader,
2285 TDB_DATA key, TDB_DATA data,
2286 void *private_data)
2288 struct ctdb_transaction_record_fetch_state *state =
2289 (struct ctdb_transaction_record_fetch_state *)private_data;
2291 if (state->key.dsize == key.dsize &&
2292 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2293 int ret;
2295 ret = ctdb_ltdb_header_extract(&data, &state->header);
2296 if (ret != 0) {
2297 DEBUG(DEBUG_ERR,
2298 ("record_fetch: Failed to extract header, "
2299 "ret=%d\n", ret));
2300 return 1;
2303 state->data = data;
2304 state->found = true;
2307 return 0;
2310 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2311 TDB_DATA key,
2312 struct ctdb_ltdb_header *header,
2313 TDB_DATA *data)
2315 struct ctdb_transaction_record_fetch_state state;
2316 int ret;
2318 state.key = key;
2319 state.found = false;
2321 ret = ctdb_rec_buffer_traverse(h->recbuf,
2322 ctdb_transaction_record_fetch_traverse,
2323 &state);
2324 if (ret != 0) {
2325 return ret;
2328 if (state.found) {
2329 if (header != NULL) {
2330 *header = state.header;
2332 if (data != NULL) {
2333 *data = state.data;
2335 return 0;
2338 return ENOENT;
2341 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2342 TDB_DATA key,
2343 TALLOC_CTX *mem_ctx, TDB_DATA *data)
2345 TDB_DATA tmp_data;
2346 struct ctdb_ltdb_header header;
2347 int ret;
2349 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2350 if (ret == 0) {
2351 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2352 tmp_data.dsize);
2353 if (data->dptr == NULL) {
2354 return ENOMEM;
2356 data->dsize = tmp_data.dsize;
2357 return 0;
2360 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2361 if (ret != 0) {
2362 return ret;
2365 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2366 if (ret != 0) {
2367 return ret;
2370 return 0;
2373 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2374 TDB_DATA key, TDB_DATA data)
2376 TALLOC_CTX *tmp_ctx;
2377 struct ctdb_ltdb_header header;
2378 TDB_DATA old_data;
2379 int ret;
2381 if (h->readonly) {
2382 return EINVAL;
2385 tmp_ctx = talloc_new(h);
2386 if (tmp_ctx == NULL) {
2387 return ENOMEM;
2390 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2391 if (ret != 0) {
2392 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2393 if (ret != 0) {
2394 return ret;
2398 if (old_data.dsize == data.dsize &&
2399 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2400 talloc_free(tmp_ctx);
2401 return 0;
2404 header.dmaster = ctdb_client_pnn(h->client);
2405 header.rsn += 1;
2407 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2408 talloc_free(tmp_ctx);
2409 if (ret != 0) {
2410 return ret;
2412 h->updated = true;
2414 return 0;
2417 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2418 TDB_DATA key)
2420 return ctdb_transaction_store_record(h, key, tdb_null);
2423 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2424 uint64_t *seqnum)
2426 const char *keyname = CTDB_DB_SEQNUM_KEY;
2427 TDB_DATA key, data;
2428 struct ctdb_ltdb_header header;
2429 int ret;
2431 key.dptr = discard_const(keyname);
2432 key.dsize = strlen(keyname) + 1;
2434 ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2435 if (ret != 0) {
2436 DEBUG(DEBUG_ERR,
2437 ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2438 h->db->db_name, ret));
2439 return ret;
2442 if (data.dsize == 0) {
2443 /* initial data */
2444 *seqnum = 0;
2445 return 0;
2448 if (data.dsize != sizeof(uint64_t)) {
2449 talloc_free(data.dptr);
2450 return EINVAL;
2453 *seqnum = *(uint64_t *)data.dptr;
2455 talloc_free(data.dptr);
2456 return 0;
2459 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2460 uint64_t seqnum)
2462 const char *keyname = CTDB_DB_SEQNUM_KEY;
2463 TDB_DATA key, data;
2465 key.dptr = discard_const(keyname);
2466 key.dsize = strlen(keyname) + 1;
2468 data.dptr = (uint8_t *)&seqnum;
2469 data.dsize = sizeof(seqnum);
2471 return ctdb_transaction_store_record(h, key, data);
2474 struct ctdb_transaction_commit_state {
2475 struct tevent_context *ev;
2476 struct timeval timeout;
2477 struct ctdb_transaction_handle *h;
2478 uint64_t seqnum;
2481 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2482 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2484 struct tevent_req *ctdb_transaction_commit_send(
2485 TALLOC_CTX *mem_ctx,
2486 struct tevent_context *ev,
2487 struct timeval timeout,
2488 struct ctdb_transaction_handle *h)
2490 struct tevent_req *req, *subreq;
2491 struct ctdb_transaction_commit_state *state;
2492 struct ctdb_req_control request;
2493 int ret;
2495 req = tevent_req_create(mem_ctx, &state,
2496 struct ctdb_transaction_commit_state);
2497 if (req == NULL) {
2498 return NULL;
2501 state->ev = ev;
2502 state->timeout = timeout;
2503 state->h = h;
2505 ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2506 if (ret != 0) {
2507 tevent_req_error(req, ret);
2508 return tevent_req_post(req, ev);
2511 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2512 if (ret != 0) {
2513 tevent_req_error(req, ret);
2514 return tevent_req_post(req, ev);
2517 ctdb_req_control_trans3_commit(&request, h->recbuf);
2518 subreq = ctdb_client_control_send(state, ev, h->client,
2519 ctdb_client_pnn(h->client),
2520 timeout, &request);
2521 if (tevent_req_nomem(subreq, req)) {
2522 return tevent_req_post(req, ev);
2524 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2526 return req;
2529 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2531 struct tevent_req *req = tevent_req_callback_data(
2532 subreq, struct tevent_req);
2533 struct ctdb_transaction_commit_state *state = tevent_req_data(
2534 req, struct ctdb_transaction_commit_state);
2535 struct ctdb_transaction_handle *h = state->h;
2536 struct ctdb_reply_control *reply;
2537 uint64_t seqnum;
2538 int ret;
2539 bool status;
2541 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2542 TALLOC_FREE(subreq);
2543 if (! status) {
2544 DEBUG(DEBUG_ERR,
2545 ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2546 h->db->db_name, ret));
2547 tevent_req_error(req, ret);
2548 return;
2551 ret = ctdb_reply_control_trans3_commit(reply);
2552 talloc_free(reply);
2554 if (ret != 0) {
2555 /* Control failed due to recovery */
2557 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2558 if (ret != 0) {
2559 tevent_req_error(req, ret);
2560 return;
2563 if (seqnum == state->seqnum) {
2564 struct ctdb_req_control request;
2566 /* try again */
2567 ctdb_req_control_trans3_commit(&request,
2568 state->h->recbuf);
2569 subreq = ctdb_client_control_send(
2570 state, state->ev, state->h->client,
2571 ctdb_client_pnn(state->h->client),
2572 state->timeout, &request);
2573 if (tevent_req_nomem(subreq, req)) {
2574 return;
2576 tevent_req_set_callback(subreq,
2577 ctdb_transaction_commit_done,
2578 req);
2579 return;
2582 if (seqnum != state->seqnum + 1) {
2583 DEBUG(DEBUG_ERR,
2584 ("transaction_commit: %s seqnum mismatch "
2585 "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2586 state->h->db->db_name, seqnum, state->seqnum));
2587 tevent_req_error(req, EIO);
2588 return;
2592 /* trans3_commit successful */
2593 subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2594 h->db_g_lock, h->lock_name, h->sid);
2595 if (tevent_req_nomem(subreq, req)) {
2596 return;
2598 tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2599 req);
2602 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2604 struct tevent_req *req = tevent_req_callback_data(
2605 subreq, struct tevent_req);
2606 struct ctdb_transaction_commit_state *state = tevent_req_data(
2607 req, struct ctdb_transaction_commit_state);
2608 int ret;
2609 bool status;
2611 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2612 TALLOC_FREE(subreq);
2613 if (! status) {
2614 DEBUG(DEBUG_ERR,
2615 ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2616 state->h->db->db_name, ret));
2617 tevent_req_error(req, ret);
2618 return;
2621 talloc_free(state->h);
2622 tevent_req_done(req);
2625 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2627 int err;
2629 if (tevent_req_is_unix_error(req, &err)) {
2630 if (perr != NULL) {
2631 *perr = err;
2633 return false;
2636 return true;
2639 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2641 struct tevent_context *ev = h->ev;
2642 TALLOC_CTX *mem_ctx;
2643 struct tevent_req *req;
2644 int ret;
2645 bool status;
2647 if (h->readonly || ! h->updated) {
2648 return ctdb_transaction_cancel(h);
2651 mem_ctx = talloc_new(NULL);
2652 if (mem_ctx == NULL) {
2653 return ENOMEM;
2656 req = ctdb_transaction_commit_send(mem_ctx, ev,
2657 tevent_timeval_zero(), h);
2658 if (req == NULL) {
2659 talloc_free(mem_ctx);
2660 return ENOMEM;
2663 tevent_req_poll(req, ev);
2665 status = ctdb_transaction_commit_recv(req, &ret);
2666 if (! status) {
2667 talloc_free(mem_ctx);
2668 return ret;
2671 talloc_free(mem_ctx);
2672 return 0;
2675 struct ctdb_transaction_cancel_state {
2676 struct tevent_context *ev;
2677 struct ctdb_transaction_handle *h;
2678 struct timeval timeout;
2681 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2683 struct tevent_req *ctdb_transaction_cancel_send(
2684 TALLOC_CTX *mem_ctx,
2685 struct tevent_context *ev,
2686 struct timeval timeout,
2687 struct ctdb_transaction_handle *h)
2689 struct tevent_req *req, *subreq;
2690 struct ctdb_transaction_cancel_state *state;
2692 req = tevent_req_create(mem_ctx, &state,
2693 struct ctdb_transaction_cancel_state);
2694 if (req == NULL) {
2695 return NULL;
2698 state->ev = ev;
2699 state->h = h;
2700 state->timeout = timeout;
2702 subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2703 state->h->db_g_lock,
2704 state->h->lock_name, state->h->sid);
2705 if (tevent_req_nomem(subreq, req)) {
2706 return tevent_req_post(req, ev);
2708 tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2709 req);
2711 return req;
2714 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2716 struct tevent_req *req = tevent_req_callback_data(
2717 subreq, struct tevent_req);
2718 struct ctdb_transaction_cancel_state *state = tevent_req_data(
2719 req, struct ctdb_transaction_cancel_state);
2720 int ret;
2721 bool status;
2723 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2724 TALLOC_FREE(subreq);
2725 if (! status) {
2726 DEBUG(DEBUG_ERR,
2727 ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2728 state->h->db->db_name, ret));
2729 talloc_free(state->h);
2730 tevent_req_error(req, ret);
2731 return;
2734 talloc_free(state->h);
2735 tevent_req_done(req);
2738 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2740 int err;
2742 if (tevent_req_is_unix_error(req, &err)) {
2743 if (perr != NULL) {
2744 *perr = err;
2746 return false;
2749 return true;
2752 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2754 struct tevent_context *ev = h->ev;
2755 struct tevent_req *req;
2756 TALLOC_CTX *mem_ctx;
2757 int ret;
2758 bool status;
2760 mem_ctx = talloc_new(NULL);
2761 if (mem_ctx == NULL) {
2762 talloc_free(h);
2763 return ENOMEM;
2766 req = ctdb_transaction_cancel_send(mem_ctx, ev,
2767 tevent_timeval_zero(), h);
2768 if (req == NULL) {
2769 talloc_free(mem_ctx);
2770 talloc_free(h);
2771 return ENOMEM;
2774 tevent_req_poll(req, ev);
2776 status = ctdb_transaction_cancel_recv(req, &ret);
2777 if (! status) {
2778 talloc_free(mem_ctx);
2779 return ret;
2782 talloc_free(mem_ctx);
2783 return 0;
2787 * TODO:
2789 * In future Samba should register SERVER_ID.
2790 * Make that structure same as struct srvid {}.