source4 dsdb: Allow duplicate non local objectSIDs
[Samba.git] / ctdb / client / client_db.c
blobe86830e7bfb99009e9a7010c347c6b129121107c
1 /*
2 CTDB client code
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
42 const char *db_name)
44 struct ctdb_db_context *db;
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
48 return db;
52 return NULL;
55 static bool ctdb_db_persistent(struct ctdb_db_context *db)
57 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
58 return true;
60 return false;
63 static bool ctdb_db_replicated(struct ctdb_db_context *db)
65 if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
66 return true;
68 return false;
71 static bool ctdb_db_volatile(struct ctdb_db_context *db)
73 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT ||
74 db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
75 return false;
77 return true;
80 struct ctdb_set_db_flags_state {
81 struct tevent_context *ev;
82 struct ctdb_client_context *client;
83 struct timeval timeout;
84 uint32_t db_id;
85 uint8_t db_flags;
86 bool readonly_done, sticky_done;
87 uint32_t *pnn_list;
88 int count;
91 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
92 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
93 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
95 static struct tevent_req *ctdb_set_db_flags_send(
96 TALLOC_CTX *mem_ctx,
97 struct tevent_context *ev,
98 struct ctdb_client_context *client,
99 uint32_t destnode, struct timeval timeout,
100 uint32_t db_id, uint8_t db_flags)
102 struct tevent_req *req, *subreq;
103 struct ctdb_set_db_flags_state *state;
104 struct ctdb_req_control request;
106 req = tevent_req_create(mem_ctx, &state,
107 struct ctdb_set_db_flags_state);
108 if (req == NULL) {
109 return NULL;
112 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
113 tevent_req_done(req);
114 return tevent_req_post(req, ev);
117 state->ev = ev;
118 state->client = client;
119 state->timeout = timeout;
120 state->db_id = db_id;
121 state->db_flags = db_flags;
123 ctdb_req_control_get_nodemap(&request);
124 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
125 &request);
126 if (tevent_req_nomem(subreq, req)) {
127 return tevent_req_post(req, ev);
129 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
131 return req;
134 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
136 struct tevent_req *req = tevent_req_callback_data(
137 subreq, struct tevent_req);
138 struct ctdb_set_db_flags_state *state = tevent_req_data(
139 req, struct ctdb_set_db_flags_state);
140 struct ctdb_req_control request;
141 struct ctdb_reply_control *reply;
142 struct ctdb_node_map *nodemap;
143 int ret;
144 bool status;
146 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
147 TALLOC_FREE(subreq);
148 if (! status) {
149 DEBUG(DEBUG_ERR,
150 ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
151 state->db_id, ret));
152 tevent_req_error(req, ret);
153 return;
156 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
157 talloc_free(reply);
158 if (ret != 0) {
159 DEBUG(DEBUG_ERR,
160 ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
161 state->db_id, ret));
162 tevent_req_error(req, ret);
163 return;
166 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
167 state, &state->pnn_list);
168 talloc_free(nodemap);
169 if (state->count <= 0) {
170 DEBUG(DEBUG_ERR,
171 ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
172 state->db_id, state->count));
173 tevent_req_error(req, ENOMEM);
174 return;
177 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
178 ctdb_req_control_set_db_readonly(&request, state->db_id);
179 subreq = ctdb_client_control_multi_send(
180 state, state->ev, state->client,
181 state->pnn_list, state->count,
182 state->timeout, &request);
183 if (tevent_req_nomem(subreq, req)) {
184 return;
186 tevent_req_set_callback(subreq,
187 ctdb_set_db_flags_readonly_done, req);
188 } else {
189 state->readonly_done = true;
192 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
193 ctdb_req_control_set_db_sticky(&request, state->db_id);
194 subreq = ctdb_client_control_multi_send(
195 state, state->ev, state->client,
196 state->pnn_list, state->count,
197 state->timeout, &request);
198 if (tevent_req_nomem(subreq, req)) {
199 return;
201 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
202 req);
203 } else {
204 state->sticky_done = true;
208 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
210 struct tevent_req *req = tevent_req_callback_data(
211 subreq, struct tevent_req);
212 struct ctdb_set_db_flags_state *state = tevent_req_data(
213 req, struct ctdb_set_db_flags_state);
214 int ret;
215 bool status;
217 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
218 NULL);
219 TALLOC_FREE(subreq);
220 if (! status) {
221 DEBUG(DEBUG_ERR,
222 ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
223 state->db_id, ret));
224 tevent_req_error(req, ret);
225 return;
228 state->readonly_done = true;
230 if (state->readonly_done && state->sticky_done) {
231 tevent_req_done(req);
235 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
237 struct tevent_req *req = tevent_req_callback_data(
238 subreq, struct tevent_req);
239 struct ctdb_set_db_flags_state *state = tevent_req_data(
240 req, struct ctdb_set_db_flags_state);
241 int ret;
242 bool status;
244 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
245 NULL);
246 TALLOC_FREE(subreq);
247 if (! status) {
248 DEBUG(DEBUG_ERR,
249 ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
250 state->db_id, ret));
251 tevent_req_error(req, ret);
252 return;
255 state->sticky_done = true;
257 if (state->readonly_done && state->sticky_done) {
258 tevent_req_done(req);
262 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
264 int err;
266 if (tevent_req_is_unix_error(req, &err)) {
267 if (perr != NULL) {
268 *perr = err;
270 return false;
272 return true;
275 struct ctdb_attach_state {
276 struct tevent_context *ev;
277 struct ctdb_client_context *client;
278 struct timeval timeout;
279 uint32_t destnode;
280 uint8_t db_flags;
281 struct ctdb_db_context *db;
284 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
285 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
286 static void ctdb_attach_health_done(struct tevent_req *subreq);
287 static void ctdb_attach_flags_done(struct tevent_req *subreq);
288 static void ctdb_attach_open_flags_done(struct tevent_req *subreq);
290 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
291 struct tevent_context *ev,
292 struct ctdb_client_context *client,
293 struct timeval timeout,
294 const char *db_name, uint8_t db_flags)
296 struct tevent_req *req, *subreq;
297 struct ctdb_attach_state *state;
298 struct ctdb_req_control request;
300 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
301 if (req == NULL) {
302 return NULL;
305 state->db = client_db_handle(client, db_name);
306 if (state->db != NULL) {
307 tevent_req_done(req);
308 return tevent_req_post(req, ev);
311 state->ev = ev;
312 state->client = client;
313 state->timeout = timeout;
314 state->destnode = ctdb_client_pnn(client);
315 state->db_flags = db_flags;
317 state->db = talloc_zero(client, struct ctdb_db_context);
318 if (tevent_req_nomem(state->db, req)) {
319 return tevent_req_post(req, ev);
322 state->db->db_name = talloc_strdup(state->db, db_name);
323 if (tevent_req_nomem(state->db, req)) {
324 return tevent_req_post(req, ev);
327 state->db->db_flags = db_flags;
329 if (ctdb_db_persistent(state->db)) {
330 ctdb_req_control_db_attach_persistent(&request,
331 state->db->db_name);
332 } else if (ctdb_db_replicated(state->db)) {
333 ctdb_req_control_db_attach_replicated(&request,
334 state->db->db_name);
335 } else {
336 ctdb_req_control_db_attach(&request, state->db->db_name);
339 subreq = ctdb_client_control_send(state, state->ev, state->client,
340 state->destnode, state->timeout,
341 &request);
342 if (tevent_req_nomem(subreq, req)) {
343 return tevent_req_post(req, ev);
345 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
347 return req;
350 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
352 struct tevent_req *req = tevent_req_callback_data(
353 subreq, struct tevent_req);
354 struct ctdb_attach_state *state = tevent_req_data(
355 req, struct ctdb_attach_state);
356 struct ctdb_req_control request;
357 struct ctdb_reply_control *reply;
358 bool status;
359 int ret;
361 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
362 TALLOC_FREE(subreq);
363 if (! status) {
364 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
365 state->db->db_name,
366 (ctdb_db_persistent(state->db)
367 ? "DB_ATTACH_PERSISTENT"
368 : (ctdb_db_replicated(state->db)
369 ? "DB_ATTACH_REPLICATED"
370 : "DB_ATTACH")),
371 ret));
372 tevent_req_error(req, ret);
373 return;
376 if (ctdb_db_persistent(state->db)) {
377 ret = ctdb_reply_control_db_attach_persistent(
378 reply, &state->db->db_id);
379 } else if (ctdb_db_replicated(state->db)) {
380 ret = ctdb_reply_control_db_attach_replicated(
381 reply, &state->db->db_id);
382 } else {
383 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
385 talloc_free(reply);
386 if (ret != 0) {
387 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
388 state->db->db_name, ret));
389 tevent_req_error(req, ret);
390 return;
393 ctdb_req_control_getdbpath(&request, state->db->db_id);
394 subreq = ctdb_client_control_send(state, state->ev, state->client,
395 state->destnode, state->timeout,
396 &request);
397 if (tevent_req_nomem(subreq, req)) {
398 return;
400 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
403 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
405 struct tevent_req *req = tevent_req_callback_data(
406 subreq, struct tevent_req);
407 struct ctdb_attach_state *state = tevent_req_data(
408 req, struct ctdb_attach_state);
409 struct ctdb_reply_control *reply;
410 struct ctdb_req_control request;
411 bool status;
412 int ret;
414 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
415 TALLOC_FREE(subreq);
416 if (! status) {
417 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
418 state->db->db_name, ret));
419 tevent_req_error(req, ret);
420 return;
423 ret = ctdb_reply_control_getdbpath(reply, state->db,
424 &state->db->db_path);
425 talloc_free(reply);
426 if (ret != 0) {
427 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
428 state->db->db_name, ret));
429 tevent_req_error(req, ret);
430 return;
433 ctdb_req_control_db_get_health(&request, state->db->db_id);
434 subreq = ctdb_client_control_send(state, state->ev, state->client,
435 state->destnode, state->timeout,
436 &request);
437 if (tevent_req_nomem(subreq, req)) {
438 return;
440 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
443 static void ctdb_attach_health_done(struct tevent_req *subreq)
445 struct tevent_req *req = tevent_req_callback_data(
446 subreq, struct tevent_req);
447 struct ctdb_attach_state *state = tevent_req_data(
448 req, struct ctdb_attach_state);
449 struct ctdb_reply_control *reply;
450 const char *reason;
451 bool status;
452 int ret;
454 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
455 TALLOC_FREE(subreq);
456 if (! status) {
457 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
458 state->db->db_name, ret));
459 tevent_req_error(req, ret);
460 return;
463 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
464 if (ret != 0) {
465 DEBUG(DEBUG_ERR,
466 ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
467 state->db->db_name, ret));
468 tevent_req_error(req, ret);
469 return;
472 if (reason != NULL) {
473 /* Database unhealthy, avoid attach */
474 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
475 state->db->db_name, reason));
476 tevent_req_error(req, EIO);
477 return;
480 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
481 state->destnode, state->timeout,
482 state->db->db_id, state->db_flags);
483 if (tevent_req_nomem(subreq, req)) {
484 return;
486 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
489 static void ctdb_attach_flags_done(struct tevent_req *subreq)
491 struct tevent_req *req = tevent_req_callback_data(
492 subreq, struct tevent_req);
493 struct ctdb_attach_state *state = tevent_req_data(
494 req, struct ctdb_attach_state);
495 struct ctdb_req_control request;
496 bool status;
497 int ret;
499 status = ctdb_set_db_flags_recv(subreq, &ret);
500 TALLOC_FREE(subreq);
501 if (! status) {
502 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
503 state->db->db_name, state->db_flags));
504 tevent_req_error(req, ret);
505 return;
508 ctdb_req_control_db_open_flags(&request, state->db->db_id);
509 subreq = ctdb_client_control_send(state, state->ev, state->client,
510 state->destnode, state->timeout,
511 &request);
512 if (tevent_req_nomem(subreq, req)) {
513 return;
515 tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req);
518 static void ctdb_attach_open_flags_done(struct tevent_req *subreq)
520 struct tevent_req *req = tevent_req_callback_data(
521 subreq, struct tevent_req);
522 struct ctdb_attach_state *state = tevent_req_data(
523 req, struct ctdb_attach_state);
524 struct ctdb_reply_control *reply;
525 bool status;
526 int ret, tdb_flags;
528 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
529 TALLOC_FREE(subreq);
530 if (! status) {
531 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
532 state->db->db_name, ret));
533 tevent_req_error(req, ret);
534 return;
537 ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags);
538 talloc_free(reply);
539 if (ret != 0) {
540 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed,"
541 " ret=%d\n", state->db->db_name, ret));
542 tevent_req_error(req, ret);
543 return;
546 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
547 tdb_flags, O_RDWR, 0);
548 if (tevent_req_nomem(state->db->ltdb, req)) {
549 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
550 state->db->db_name));
551 return;
553 DLIST_ADD(state->client->db, state->db);
555 tevent_req_done(req);
558 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
559 struct ctdb_db_context **out)
561 struct ctdb_attach_state *state = tevent_req_data(
562 req, struct ctdb_attach_state);
563 int err;
565 if (tevent_req_is_unix_error(req, &err)) {
566 if (perr != NULL) {
567 *perr = err;
569 return false;
572 if (out != NULL) {
573 *out = state->db;
575 return true;
578 int ctdb_attach(struct tevent_context *ev,
579 struct ctdb_client_context *client,
580 struct timeval timeout,
581 const char *db_name, uint8_t db_flags,
582 struct ctdb_db_context **out)
584 TALLOC_CTX *mem_ctx;
585 struct tevent_req *req;
586 bool status;
587 int ret;
589 mem_ctx = talloc_new(client);
590 if (mem_ctx == NULL) {
591 return ENOMEM;
594 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
595 db_name, db_flags);
596 if (req == NULL) {
597 talloc_free(mem_ctx);
598 return ENOMEM;
601 tevent_req_poll(req, ev);
603 status = ctdb_attach_recv(req, &ret, out);
604 if (! status) {
605 talloc_free(mem_ctx);
606 return ret;
610 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
611 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
612 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
615 talloc_free(mem_ctx);
616 return 0;
619 struct ctdb_detach_state {
620 struct ctdb_client_context *client;
621 struct tevent_context *ev;
622 struct timeval timeout;
623 uint32_t db_id;
624 const char *db_name;
627 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
628 static void ctdb_detach_done(struct tevent_req *subreq);
630 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
631 struct tevent_context *ev,
632 struct ctdb_client_context *client,
633 struct timeval timeout, uint32_t db_id)
635 struct tevent_req *req, *subreq;
636 struct ctdb_detach_state *state;
637 struct ctdb_req_control request;
639 req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
640 if (req == NULL) {
641 return NULL;
644 state->client = client;
645 state->ev = ev;
646 state->timeout = timeout;
647 state->db_id = db_id;
649 ctdb_req_control_get_dbname(&request, db_id);
650 subreq = ctdb_client_control_send(state, ev, client,
651 ctdb_client_pnn(client), timeout,
652 &request);
653 if (tevent_req_nomem(subreq, req)) {
654 return tevent_req_post(req, ev);
656 tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
658 return req;
661 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
663 struct tevent_req *req = tevent_req_callback_data(
664 subreq, struct tevent_req);
665 struct ctdb_detach_state *state = tevent_req_data(
666 req, struct ctdb_detach_state);
667 struct ctdb_reply_control *reply;
668 struct ctdb_req_control request;
669 int ret;
670 bool status;
672 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
673 TALLOC_FREE(subreq);
674 if (! status) {
675 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
676 state->db_id, ret));
677 tevent_req_error(req, ret);
678 return;
681 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
682 if (ret != 0) {
683 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
684 state->db_id, ret));
685 tevent_req_error(req, ret);
686 return;
689 ctdb_req_control_db_detach(&request, state->db_id);
690 subreq = ctdb_client_control_send(state, state->ev, state->client,
691 ctdb_client_pnn(state->client),
692 state->timeout, &request);
693 if (tevent_req_nomem(subreq, req)) {
694 return;
696 tevent_req_set_callback(subreq, ctdb_detach_done, req);
700 static void ctdb_detach_done(struct tevent_req *subreq)
702 struct tevent_req *req = tevent_req_callback_data(
703 subreq, struct tevent_req);
704 struct ctdb_detach_state *state = tevent_req_data(
705 req, struct ctdb_detach_state);
706 struct ctdb_reply_control *reply;
707 struct ctdb_db_context *db;
708 int ret;
709 bool status;
711 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
712 TALLOC_FREE(subreq);
713 if (! status) {
714 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
715 state->db_name, ret));
716 tevent_req_error(req, ret);
717 return;
720 ret = ctdb_reply_control_db_detach(reply);
721 if (ret != 0) {
722 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
723 state->db_name, ret));
724 tevent_req_error(req, ret);
725 return;
728 db = client_db_handle(state->client, state->db_name);
729 if (db != NULL) {
730 DLIST_REMOVE(state->client->db, db);
731 TALLOC_FREE(db);
734 tevent_req_done(req);
737 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
739 int ret;
741 if (tevent_req_is_unix_error(req, &ret)) {
742 if (perr != NULL) {
743 *perr = ret;
745 return false;
748 return true;
751 int ctdb_detach(struct tevent_context *ev,
752 struct ctdb_client_context *client,
753 struct timeval timeout, uint32_t db_id)
755 TALLOC_CTX *mem_ctx;
756 struct tevent_req *req;
757 int ret;
758 bool status;
760 mem_ctx = talloc_new(client);
761 if (mem_ctx == NULL) {
762 return ENOMEM;
765 req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
766 if (req == NULL) {
767 talloc_free(mem_ctx);
768 return ENOMEM;
771 tevent_req_poll(req, ev);
773 status = ctdb_detach_recv(req, &ret);
774 if (! status) {
775 talloc_free(mem_ctx);
776 return ret;
779 talloc_free(mem_ctx);
780 return 0;
783 uint32_t ctdb_db_id(struct ctdb_db_context *db)
785 return db->db_id;
788 struct ctdb_db_traverse_local_state {
789 ctdb_rec_parser_func_t parser;
790 void *private_data;
791 bool extract_header;
792 int error;
795 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
796 TDB_DATA key, TDB_DATA data,
797 void *private_data)
799 struct ctdb_db_traverse_local_state *state =
800 (struct ctdb_db_traverse_local_state *)private_data;
801 int ret;
803 if (state->extract_header) {
804 struct ctdb_ltdb_header header;
806 ret = ctdb_ltdb_header_extract(&data, &header);
807 if (ret != 0) {
808 state->error = ret;
809 return 1;
812 ret = state->parser(0, &header, key, data, state->private_data);
813 } else {
814 ret = state->parser(0, NULL, key, data, state->private_data);
817 if (ret != 0) {
818 state->error = ret;
819 return 1;
822 return 0;
825 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
826 bool extract_header,
827 ctdb_rec_parser_func_t parser, void *private_data)
829 struct ctdb_db_traverse_local_state state;
830 int ret;
832 state.parser = parser;
833 state.private_data = private_data;
834 state.extract_header = extract_header;
835 state.error = 0;
837 if (readonly) {
838 ret = tdb_traverse_read(db->ltdb->tdb,
839 ctdb_db_traverse_local_handler,
840 &state);
841 } else {
842 ret = tdb_traverse(db->ltdb->tdb,
843 ctdb_db_traverse_local_handler, &state);
846 if (ret == -1) {
847 return EIO;
850 return state.error;
853 struct ctdb_db_traverse_state {
854 struct tevent_context *ev;
855 struct ctdb_client_context *client;
856 struct ctdb_db_context *db;
857 uint32_t destnode;
858 uint64_t srvid;
859 struct timeval timeout;
860 ctdb_rec_parser_func_t parser;
861 void *private_data;
862 int result;
865 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
866 static void ctdb_db_traverse_started(struct tevent_req *subreq);
867 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
868 void *private_data);
869 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
870 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
872 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
873 struct tevent_context *ev,
874 struct ctdb_client_context *client,
875 struct ctdb_db_context *db,
876 uint32_t destnode,
877 struct timeval timeout,
878 ctdb_rec_parser_func_t parser,
879 void *private_data)
881 struct tevent_req *req, *subreq;
882 struct ctdb_db_traverse_state *state;
884 req = tevent_req_create(mem_ctx, &state,
885 struct ctdb_db_traverse_state);
886 if (req == NULL) {
887 return NULL;
890 state->ev = ev;
891 state->client = client;
892 state->db = db;
893 state->destnode = destnode;
894 state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
895 state->timeout = timeout;
896 state->parser = parser;
897 state->private_data = private_data;
899 subreq = ctdb_client_set_message_handler_send(state, ev, client,
900 state->srvid,
901 ctdb_db_traverse_handler,
902 req);
903 if (tevent_req_nomem(subreq, req)) {
904 return tevent_req_post(req, ev);
906 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
908 return req;
911 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
913 struct tevent_req *req = tevent_req_callback_data(
914 subreq, struct tevent_req);
915 struct ctdb_db_traverse_state *state = tevent_req_data(
916 req, struct ctdb_db_traverse_state);
917 struct ctdb_traverse_start_ext traverse;
918 struct ctdb_req_control request;
919 int ret = 0;
920 bool status;
922 status = ctdb_client_set_message_handler_recv(subreq, &ret);
923 TALLOC_FREE(subreq);
924 if (! status) {
925 tevent_req_error(req, ret);
926 return;
929 traverse = (struct ctdb_traverse_start_ext) {
930 .db_id = ctdb_db_id(state->db),
931 .reqid = 0,
932 .srvid = state->srvid,
933 .withemptyrecords = false,
936 ctdb_req_control_traverse_start_ext(&request, &traverse);
937 subreq = ctdb_client_control_send(state, state->ev, state->client,
938 state->destnode, state->timeout,
939 &request);
940 if (subreq == NULL) {
941 state->result = ENOMEM;
942 ctdb_db_traverse_remove_handler(req);
943 return;
945 tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
948 static void ctdb_db_traverse_started(struct tevent_req *subreq)
950 struct tevent_req *req = tevent_req_callback_data(
951 subreq, struct tevent_req);
952 struct ctdb_db_traverse_state *state = tevent_req_data(
953 req, struct ctdb_db_traverse_state);
954 struct ctdb_reply_control *reply;
955 int ret = 0;
956 bool status;
958 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
959 TALLOC_FREE(subreq);
960 if (! status) {
961 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
962 state->result = ret;
963 ctdb_db_traverse_remove_handler(req);
964 return;
967 ret = ctdb_reply_control_traverse_start_ext(reply);
968 talloc_free(reply);
969 if (ret != 0) {
970 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
971 ret));
972 state->result = ret;
973 ctdb_db_traverse_remove_handler(req);
974 return;
978 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
979 void *private_data)
981 struct tevent_req *req = talloc_get_type_abort(
982 private_data, struct tevent_req);
983 struct ctdb_db_traverse_state *state = tevent_req_data(
984 req, struct ctdb_db_traverse_state);
985 struct ctdb_rec_data *rec;
986 struct ctdb_ltdb_header header;
987 size_t np;
988 int ret;
990 ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np);
991 if (ret != 0) {
992 return;
995 if (rec->key.dsize == 0 && rec->data.dsize == 0) {
996 talloc_free(rec);
997 ctdb_db_traverse_remove_handler(req);
998 return;
1001 ret = ctdb_ltdb_header_extract(&rec->data, &header);
1002 if (ret != 0) {
1003 talloc_free(rec);
1004 return;
1007 if (rec->data.dsize == 0) {
1008 talloc_free(rec);
1009 return;
1012 ret = state->parser(rec->reqid, &header, rec->key, rec->data,
1013 state->private_data);
1014 talloc_free(rec);
1015 if (ret != 0) {
1016 state->result = ret;
1017 ctdb_db_traverse_remove_handler(req);
1021 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
1023 struct ctdb_db_traverse_state *state = tevent_req_data(
1024 req, struct ctdb_db_traverse_state);
1025 struct tevent_req *subreq;
1027 subreq = ctdb_client_remove_message_handler_send(state, state->ev,
1028 state->client,
1029 state->srvid, req);
1030 if (tevent_req_nomem(subreq, req)) {
1031 return;
1033 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1036 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1038 struct tevent_req *req = tevent_req_callback_data(
1039 subreq, struct tevent_req);
1040 struct ctdb_db_traverse_state *state = tevent_req_data(
1041 req, struct ctdb_db_traverse_state);
1042 int ret;
1043 bool status;
1045 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1046 TALLOC_FREE(subreq);
1047 if (! status) {
1048 tevent_req_error(req, ret);
1049 return;
1052 if (state->result != 0) {
1053 tevent_req_error(req, state->result);
1054 return;
1057 tevent_req_done(req);
1060 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1062 int ret;
1064 if (tevent_req_is_unix_error(req, &ret)) {
1065 if (perr != NULL) {
1066 *perr = ret;
1068 return false;
1071 return true;
1074 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1075 struct ctdb_client_context *client,
1076 struct ctdb_db_context *db,
1077 uint32_t destnode, struct timeval timeout,
1078 ctdb_rec_parser_func_t parser, void *private_data)
1080 struct tevent_req *req;
1081 int ret = 0;
1082 bool status;
1084 req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1085 timeout, parser, private_data);
1086 if (req == NULL) {
1087 return ENOMEM;
1090 tevent_req_poll(req, ev);
1092 status = ctdb_db_traverse_recv(req, &ret);
1093 if (! status) {
1094 return ret;
1097 return 0;
1100 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1101 struct ctdb_ltdb_header *header,
1102 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1104 TDB_DATA rec;
1105 size_t np;
1106 int ret;
1108 rec = tdb_fetch(db->ltdb->tdb, key);
1109 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1110 /* No record present */
1111 if (rec.dptr != NULL) {
1112 free(rec.dptr);
1115 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
1116 return EIO;
1119 *header = (struct ctdb_ltdb_header) {
1120 .dmaster = CTDB_UNKNOWN_PNN,
1123 if (data != NULL) {
1124 *data = tdb_null;
1126 return 0;
1129 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
1130 if (ret != 0) {
1131 return ret;
1134 ret = 0;
1135 if (data != NULL) {
1136 data->dsize = rec.dsize - np;
1137 data->dptr = talloc_memdup(mem_ctx, rec.dptr + np,
1138 data->dsize);
1139 if (data->dptr == NULL) {
1140 ret = ENOMEM;
1144 free(rec.dptr);
1145 return ret;
1149 * Fetch a record from volatile database
1151 * Steps:
1152 * 1. Get a lock on the hash chain
1153 * 2. If the record does not exist, migrate the record
1154 * 3. If readonly=true and delegations do not exist, migrate the record.
1155 * 4. If readonly=false and delegations exist, migrate the record.
1156 * 5. If the local node is not dmaster, migrate the record.
1157 * 6. Return record
1160 struct ctdb_fetch_lock_state {
1161 struct tevent_context *ev;
1162 struct ctdb_client_context *client;
1163 struct ctdb_record_handle *h;
1164 bool readonly;
1165 uint32_t pnn;
1168 static int ctdb_fetch_lock_check(struct tevent_req *req);
1169 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1170 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1172 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1173 struct tevent_context *ev,
1174 struct ctdb_client_context *client,
1175 struct ctdb_db_context *db,
1176 TDB_DATA key, bool readonly)
1178 struct ctdb_fetch_lock_state *state;
1179 struct tevent_req *req;
1180 int ret;
1182 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1183 if (req == NULL) {
1184 return NULL;
1187 state->ev = ev;
1188 state->client = client;
1190 state->h = talloc_zero(db, struct ctdb_record_handle);
1191 if (tevent_req_nomem(state->h, req)) {
1192 return tevent_req_post(req, ev);
1194 state->h->client = client;
1195 state->h->db = db;
1196 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1197 if (tevent_req_nomem(state->h->key.dptr, req)) {
1198 return tevent_req_post(req, ev);
1200 state->h->key.dsize = key.dsize;
1201 state->h->readonly = false;
1203 state->readonly = readonly;
1204 state->pnn = ctdb_client_pnn(client);
1206 /* Check that database is not persistent */
1207 if (! ctdb_db_volatile(db)) {
1208 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1209 db->db_name));
1210 tevent_req_error(req, EINVAL);
1211 return tevent_req_post(req, ev);
1214 ret = ctdb_fetch_lock_check(req);
1215 if (ret == 0) {
1216 tevent_req_done(req);
1217 return tevent_req_post(req, ev);
1219 if (ret != EAGAIN) {
1220 tevent_req_error(req, ret);
1221 return tevent_req_post(req, ev);
1223 return req;
1226 static int ctdb_fetch_lock_check(struct tevent_req *req)
1228 struct ctdb_fetch_lock_state *state = tevent_req_data(
1229 req, struct ctdb_fetch_lock_state);
1230 struct ctdb_record_handle *h = state->h;
1231 struct ctdb_ltdb_header header;
1232 TDB_DATA data = tdb_null;
1233 size_t np;
1234 int ret, err = 0;
1235 bool do_migrate = false;
1237 ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1238 if (ret != 0) {
1239 DEBUG(DEBUG_ERR,
1240 ("fetch_lock: %s tdb_chainlock failed, %s\n",
1241 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1242 err = EIO;
1243 goto failed;
1246 data = tdb_fetch(h->db->ltdb->tdb, h->key);
1247 if (data.dptr == NULL) {
1248 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1249 goto migrate;
1250 } else {
1251 err = EIO;
1252 goto failed;
1256 /* Got the record */
1257 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np);
1258 if (ret != 0) {
1259 err = ret;
1260 goto failed;
1263 if (! state->readonly) {
1264 /* Read/write access */
1265 if (header.dmaster == state->pnn &&
1266 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1267 goto migrate;
1270 if (header.dmaster != state->pnn) {
1271 goto migrate;
1273 } else {
1274 /* Readonly access */
1275 if (header.dmaster != state->pnn &&
1276 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1277 CTDB_REC_RO_HAVE_DELEGATIONS))) {
1278 goto migrate;
1282 /* We are the dmaster or readonly delegation */
1283 h->header = header;
1284 h->data = data;
1285 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1286 CTDB_REC_RO_HAVE_DELEGATIONS)) {
1287 h->readonly = true;
1289 return 0;
1291 migrate:
1292 do_migrate = true;
1293 err = EAGAIN;
1295 failed:
1296 if (data.dptr != NULL) {
1297 free(data.dptr);
1299 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1300 if (ret != 0) {
1301 DEBUG(DEBUG_ERR,
1302 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1303 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1304 return EIO;
1307 if (do_migrate) {
1308 ctdb_fetch_lock_migrate(req);
1310 return err;
1313 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1315 struct ctdb_fetch_lock_state *state = tevent_req_data(
1316 req, struct ctdb_fetch_lock_state);
1317 struct ctdb_req_call request;
1318 struct tevent_req *subreq;
1320 ZERO_STRUCT(request);
1321 request.flags = CTDB_IMMEDIATE_MIGRATION;
1322 if (state->readonly) {
1323 request.flags |= CTDB_WANT_READONLY;
1325 request.db_id = state->h->db->db_id;
1326 request.callid = CTDB_NULL_FUNC;
1327 request.key = state->h->key;
1328 request.calldata = tdb_null;
1330 subreq = ctdb_client_call_send(state, state->ev, state->client,
1331 &request);
1332 if (tevent_req_nomem(subreq, req)) {
1333 return;
1336 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1339 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1341 struct tevent_req *req = tevent_req_callback_data(
1342 subreq, struct tevent_req);
1343 struct ctdb_fetch_lock_state *state = tevent_req_data(
1344 req, struct ctdb_fetch_lock_state);
1345 struct ctdb_reply_call *reply;
1346 int ret;
1347 bool status;
1349 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1350 TALLOC_FREE(subreq);
1351 if (! status) {
1352 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1353 state->h->db->db_name, ret));
1354 tevent_req_error(req, ret);
1355 return;
1358 if (reply->status != 0) {
1359 tevent_req_error(req, EIO);
1360 return;
1362 talloc_free(reply);
1364 ret = ctdb_fetch_lock_check(req);
1365 if (ret != 0) {
1366 if (ret != EAGAIN) {
1367 tevent_req_error(req, ret);
1369 return;
1372 tevent_req_done(req);
1375 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1377 int ret;
1379 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1380 if (ret != 0) {
1381 DEBUG(DEBUG_ERR,
1382 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1383 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1385 free(h->data.dptr);
1386 return 0;
1389 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1390 struct ctdb_ltdb_header *header,
1391 TALLOC_CTX *mem_ctx,
1392 TDB_DATA *data, int *perr)
1394 struct ctdb_fetch_lock_state *state = tevent_req_data(
1395 req, struct ctdb_fetch_lock_state);
1396 struct ctdb_record_handle *h = state->h;
1397 int err;
1399 if (tevent_req_is_unix_error(req, &err)) {
1400 if (perr != NULL) {
1401 TALLOC_FREE(state->h);
1402 *perr = err;
1404 return NULL;
1407 if (header != NULL) {
1408 *header = h->header;
1410 if (data != NULL) {
1411 size_t offset;
1413 offset = ctdb_ltdb_header_len(&h->header);
1415 data->dsize = h->data.dsize - offset;
1416 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1417 data->dsize);
1418 if (data->dptr == NULL) {
1419 TALLOC_FREE(state->h);
1420 if (perr != NULL) {
1421 *perr = ENOMEM;
1423 return NULL;
1427 talloc_set_destructor(h, ctdb_record_handle_destructor);
1428 return h;
1431 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1432 struct ctdb_client_context *client,
1433 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1434 struct ctdb_record_handle **out,
1435 struct ctdb_ltdb_header *header, TDB_DATA *data)
1437 struct tevent_req *req;
1438 struct ctdb_record_handle *h;
1439 int ret;
1441 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1442 if (req == NULL) {
1443 return ENOMEM;
1446 tevent_req_poll(req, ev);
1448 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1449 if (h == NULL) {
1450 return ret;
1453 *out = h;
1454 return 0;
1457 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1459 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1460 TDB_DATA rec[2];
1461 size_t np;
1462 int ret;
1464 /* Cannot modify the record if it was obtained as a readonly copy */
1465 if (h->readonly) {
1466 return EINVAL;
1469 /* Check if the new data is same */
1470 if (h->data.dsize == data.dsize &&
1471 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1472 /* No need to do anything */
1473 return 0;
1476 ctdb_ltdb_header_push(&h->header, header, &np);
1478 rec[0].dsize = np;
1479 rec[0].dptr = header;
1481 rec[1].dsize = data.dsize;
1482 rec[1].dptr = data.dptr;
1484 ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1485 if (ret != 0) {
1486 DEBUG(DEBUG_ERR,
1487 ("store_record: %s tdb_storev failed, %s\n",
1488 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1489 return EIO;
1492 return 0;
1495 struct ctdb_delete_record_state {
1496 struct ctdb_record_handle *h;
1499 static void ctdb_delete_record_done(struct tevent_req *subreq);
1501 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1502 struct tevent_context *ev,
1503 struct ctdb_record_handle *h)
1505 struct tevent_req *req, *subreq;
1506 struct ctdb_delete_record_state *state;
1507 struct ctdb_key_data key;
1508 struct ctdb_req_control request;
1509 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1510 TDB_DATA rec;
1511 size_t np;
1512 int ret;
1514 req = tevent_req_create(mem_ctx, &state,
1515 struct ctdb_delete_record_state);
1516 if (req == NULL) {
1517 return NULL;
1520 state->h = h;
1522 /* Cannot delete the record if it was obtained as a readonly copy */
1523 if (h->readonly) {
1524 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1525 h->db->db_name));
1526 tevent_req_error(req, EINVAL);
1527 return tevent_req_post(req, ev);
1530 ctdb_ltdb_header_push(&h->header, header, &np);
1532 rec.dsize = np;
1533 rec.dptr = header;
1535 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1536 if (ret != 0) {
1537 DEBUG(DEBUG_ERR,
1538 ("fetch_lock delete: %s tdb_sore failed, %s\n",
1539 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1540 tevent_req_error(req, EIO);
1541 return tevent_req_post(req, ev);
1544 key.db_id = h->db->db_id;
1545 key.header = h->header;
1546 key.key = h->key;
1548 ctdb_req_control_schedule_for_deletion(&request, &key);
1549 subreq = ctdb_client_control_send(state, ev, h->client,
1550 ctdb_client_pnn(h->client),
1551 tevent_timeval_zero(),
1552 &request);
1553 if (tevent_req_nomem(subreq, req)) {
1554 return tevent_req_post(req, ev);
1556 tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1558 return req;
1561 static void ctdb_delete_record_done(struct tevent_req *subreq)
1563 struct tevent_req *req = tevent_req_callback_data(
1564 subreq, struct tevent_req);
1565 struct ctdb_delete_record_state *state = tevent_req_data(
1566 req, struct ctdb_delete_record_state);
1567 int ret;
1568 bool status;
1570 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1571 TALLOC_FREE(subreq);
1572 if (! status) {
1573 DEBUG(DEBUG_ERR,
1574 ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1575 "ret=%d\n", state->h->db->db_name, ret));
1576 tevent_req_error(req, ret);
1577 return;
1580 tevent_req_done(req);
1583 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1585 int err;
1587 if (tevent_req_is_unix_error(req, &err)) {
1588 if (perr != NULL) {
1589 *perr = err;
1591 return false;
1594 return true;
1598 int ctdb_delete_record(struct ctdb_record_handle *h)
1600 struct tevent_context *ev = h->ev;
1601 TALLOC_CTX *mem_ctx;
1602 struct tevent_req *req;
1603 int ret;
1604 bool status;
1606 mem_ctx = talloc_new(NULL);
1607 if (mem_ctx == NULL) {
1608 return ENOMEM;
1611 req = ctdb_delete_record_send(mem_ctx, ev, h);
1612 if (req == NULL) {
1613 talloc_free(mem_ctx);
1614 return ENOMEM;
1617 tevent_req_poll(req, ev);
1619 status = ctdb_delete_record_recv(req, &ret);
1620 talloc_free(mem_ctx);
1621 if (! status) {
1622 return ret;
1625 return 0;
1629 * Global lock functions
1632 struct ctdb_g_lock_lock_state {
1633 struct tevent_context *ev;
1634 struct ctdb_client_context *client;
1635 struct ctdb_db_context *db;
1636 TDB_DATA key;
1637 struct ctdb_server_id my_sid;
1638 enum ctdb_g_lock_type lock_type;
1639 struct ctdb_record_handle *h;
1640 /* state for verification of active locks */
1641 struct ctdb_g_lock_list *lock_list;
1642 unsigned int current;
1645 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1646 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1647 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1648 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1649 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1651 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1652 enum ctdb_g_lock_type l2)
1654 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1655 return false;
1657 return true;
1660 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1661 struct tevent_context *ev,
1662 struct ctdb_client_context *client,
1663 struct ctdb_db_context *db,
1664 const char *keyname,
1665 struct ctdb_server_id *sid,
1666 bool readonly)
1668 struct tevent_req *req, *subreq;
1669 struct ctdb_g_lock_lock_state *state;
1671 req = tevent_req_create(mem_ctx, &state,
1672 struct ctdb_g_lock_lock_state);
1673 if (req == NULL) {
1674 return NULL;
1677 state->ev = ev;
1678 state->client = client;
1679 state->db = db;
1680 state->key.dptr = discard_const(keyname);
1681 state->key.dsize = strlen(keyname) + 1;
1682 state->my_sid = *sid;
1683 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1685 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1686 false);
1687 if (tevent_req_nomem(subreq, req)) {
1688 return tevent_req_post(req, ev);
1690 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1692 return req;
1695 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1697 struct tevent_req *req = tevent_req_callback_data(
1698 subreq, struct tevent_req);
1699 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1700 req, struct ctdb_g_lock_lock_state);
1701 TDB_DATA data;
1702 size_t np;
1703 int ret = 0;
1705 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1706 TALLOC_FREE(subreq);
1707 if (state->h == NULL) {
1708 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1709 (char *)state->key.dptr));
1710 tevent_req_error(req, ret);
1711 return;
1714 if (state->lock_list != NULL) {
1715 TALLOC_FREE(state->lock_list);
1716 state->current = 0;
1719 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1720 &state->lock_list, &np);
1721 talloc_free(data.dptr);
1722 if (ret != 0) {
1723 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1724 (char *)state->key.dptr));
1725 tevent_req_error(req, ret);
1726 return;
1729 ctdb_g_lock_lock_process_locks(req);
1732 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1734 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1735 req, struct ctdb_g_lock_lock_state);
1736 struct tevent_req *subreq;
1737 struct ctdb_g_lock *lock;
1738 bool check_server = false;
1739 int ret;
1741 while (state->current < state->lock_list->num) {
1742 lock = &state->lock_list->lock[state->current];
1744 /* We should not ask for the same lock more than once */
1745 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1746 DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1747 (char *)state->key.dptr));
1748 tevent_req_error(req, EDEADLK);
1749 return;
1752 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1753 check_server = true;
1754 break;
1757 state->current += 1;
1760 if (check_server) {
1761 struct ctdb_req_control request;
1763 ctdb_req_control_process_exists(&request, lock->sid.pid);
1764 subreq = ctdb_client_control_send(state, state->ev,
1765 state->client,
1766 lock->sid.vnn,
1767 tevent_timeval_zero(),
1768 &request);
1769 if (tevent_req_nomem(subreq, req)) {
1770 return;
1772 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1773 return;
1776 /* There is no conflict, add ourself to the lock_list */
1777 state->lock_list->lock = talloc_realloc(state->lock_list,
1778 state->lock_list->lock,
1779 struct ctdb_g_lock,
1780 state->lock_list->num + 1);
1781 if (state->lock_list->lock == NULL) {
1782 tevent_req_error(req, ENOMEM);
1783 return;
1786 lock = &state->lock_list->lock[state->lock_list->num];
1787 lock->type = state->lock_type;
1788 lock->sid = state->my_sid;
1789 state->lock_list->num += 1;
1791 ret = ctdb_g_lock_lock_update(req);
1792 if (ret != 0) {
1793 tevent_req_error(req, ret);
1794 return;
1797 TALLOC_FREE(state->h);
1798 tevent_req_done(req);
1801 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1803 struct tevent_req *req = tevent_req_callback_data(
1804 subreq, struct tevent_req);
1805 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1806 req, struct ctdb_g_lock_lock_state);
1807 struct ctdb_reply_control *reply;
1808 int ret, value;
1809 bool status;
1811 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1812 TALLOC_FREE(subreq);
1813 if (! status) {
1814 DEBUG(DEBUG_ERR,
1815 ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1816 (char *)state->key.dptr, ret));
1817 tevent_req_error(req, ret);
1818 return;
1821 ret = ctdb_reply_control_process_exists(reply, &value);
1822 if (ret != 0) {
1823 tevent_req_error(req, ret);
1824 return;
1826 talloc_free(reply);
1828 if (value == 0) {
1829 /* server process exists, need to retry */
1830 TALLOC_FREE(state->h);
1831 subreq = tevent_wakeup_send(state, state->ev,
1832 tevent_timeval_current_ofs(0,1000));
1833 if (tevent_req_nomem(subreq, req)) {
1834 return;
1836 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1837 return;
1840 /* server process does not exist, remove conflicting entry */
1841 state->lock_list->lock[state->current] =
1842 state->lock_list->lock[state->lock_list->num-1];
1843 state->lock_list->num -= 1;
1845 ret = ctdb_g_lock_lock_update(req);
1846 if (ret != 0) {
1847 tevent_req_error(req, ret);
1848 return;
1851 ctdb_g_lock_lock_process_locks(req);
1854 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1856 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1857 req, struct ctdb_g_lock_lock_state);
1858 TDB_DATA data;
1859 size_t np;
1860 int ret;
1862 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1863 data.dptr = talloc_size(state, data.dsize);
1864 if (data.dptr == NULL) {
1865 return ENOMEM;
1868 ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
1869 ret = ctdb_store_record(state->h, data);
1870 talloc_free(data.dptr);
1871 return ret;
1874 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1876 struct tevent_req *req = tevent_req_callback_data(
1877 subreq, struct tevent_req);
1878 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1879 req, struct ctdb_g_lock_lock_state);
1880 bool success;
1882 success = tevent_wakeup_recv(subreq);
1883 TALLOC_FREE(subreq);
1884 if (! success) {
1885 tevent_req_error(req, ENOMEM);
1886 return;
1889 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1890 state->db, state->key, false);
1891 if (tevent_req_nomem(subreq, req)) {
1892 return;
1894 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1897 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1899 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1900 req, struct ctdb_g_lock_lock_state);
1901 int err;
1903 TALLOC_FREE(state->h);
1905 if (tevent_req_is_unix_error(req, &err)) {
1906 if (perr != NULL) {
1907 *perr = err;
1909 return false;
1912 return true;
1915 struct ctdb_g_lock_unlock_state {
1916 struct tevent_context *ev;
1917 struct ctdb_client_context *client;
1918 struct ctdb_db_context *db;
1919 TDB_DATA key;
1920 struct ctdb_server_id my_sid;
1921 struct ctdb_record_handle *h;
1922 struct ctdb_g_lock_list *lock_list;
1925 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1926 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1927 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1929 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1930 struct tevent_context *ev,
1931 struct ctdb_client_context *client,
1932 struct ctdb_db_context *db,
1933 const char *keyname,
1934 struct ctdb_server_id sid)
1936 struct tevent_req *req, *subreq;
1937 struct ctdb_g_lock_unlock_state *state;
1939 req = tevent_req_create(mem_ctx, &state,
1940 struct ctdb_g_lock_unlock_state);
1941 if (req == NULL) {
1942 return NULL;
1945 state->ev = ev;
1946 state->client = client;
1947 state->db = db;
1948 state->key.dptr = discard_const(keyname);
1949 state->key.dsize = strlen(keyname) + 1;
1950 state->my_sid = sid;
1952 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1953 false);
1954 if (tevent_req_nomem(subreq, req)) {
1955 return tevent_req_post(req, ev);
1957 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1959 return req;
1962 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1964 struct tevent_req *req = tevent_req_callback_data(
1965 subreq, struct tevent_req);
1966 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1967 req, struct ctdb_g_lock_unlock_state);
1968 TDB_DATA data;
1969 size_t np;
1970 int ret = 0;
1972 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1973 TALLOC_FREE(subreq);
1974 if (state->h == NULL) {
1975 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1976 (char *)state->key.dptr));
1977 tevent_req_error(req, ret);
1978 return;
1981 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1982 &state->lock_list, &np);
1983 if (ret != 0) {
1984 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1985 (char *)state->key.dptr));
1986 tevent_req_error(req, ret);
1987 return;
1990 ret = ctdb_g_lock_unlock_update(req);
1991 if (ret != 0) {
1992 tevent_req_error(req, ret);
1993 return;
1996 if (state->lock_list->num == 0) {
1997 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1998 if (tevent_req_nomem(subreq, req)) {
1999 return;
2001 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
2002 req);
2003 return;
2006 TALLOC_FREE(state->h);
2007 tevent_req_done(req);
2010 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
2012 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2013 req, struct ctdb_g_lock_unlock_state);
2014 struct ctdb_g_lock *lock;
2015 int ret, i;
2017 for (i=0; i<state->lock_list->num; i++) {
2018 lock = &state->lock_list->lock[i];
2020 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
2021 break;
2025 if (i < state->lock_list->num) {
2026 state->lock_list->lock[i] =
2027 state->lock_list->lock[state->lock_list->num-1];
2028 state->lock_list->num -= 1;
2031 if (state->lock_list->num != 0) {
2032 TDB_DATA data;
2033 size_t np;
2035 data.dsize = ctdb_g_lock_list_len(state->lock_list);
2036 data.dptr = talloc_size(state, data.dsize);
2037 if (data.dptr == NULL) {
2038 return ENOMEM;
2041 ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
2042 ret = ctdb_store_record(state->h, data);
2043 talloc_free(data.dptr);
2044 if (ret != 0) {
2045 return ret;
2049 return 0;
2052 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2054 struct tevent_req *req = tevent_req_callback_data(
2055 subreq, struct tevent_req);
2056 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2057 req, struct ctdb_g_lock_unlock_state);
2058 int ret;
2059 bool status;
2061 status = ctdb_delete_record_recv(subreq, &ret);
2062 if (! status) {
2063 DEBUG(DEBUG_ERR,
2064 ("g_lock_unlock %s delete record failed, ret=%d\n",
2065 (char *)state->key.dptr, ret));
2066 tevent_req_error(req, ret);
2067 return;
2070 TALLOC_FREE(state->h);
2071 tevent_req_done(req);
2074 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2076 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2077 req, struct ctdb_g_lock_unlock_state);
2078 int err;
2080 TALLOC_FREE(state->h);
2082 if (tevent_req_is_unix_error(req, &err)) {
2083 if (perr != NULL) {
2084 *perr = err;
2086 return false;
2089 return true;
2093 * Persistent database functions
2095 struct ctdb_transaction_start_state {
2096 struct tevent_context *ev;
2097 struct ctdb_client_context *client;
2098 struct timeval timeout;
2099 struct ctdb_transaction_handle *h;
2100 uint32_t destnode;
2103 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2104 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2106 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2107 struct tevent_context *ev,
2108 struct ctdb_client_context *client,
2109 struct timeval timeout,
2110 struct ctdb_db_context *db,
2111 bool readonly)
2113 struct ctdb_transaction_start_state *state;
2114 struct tevent_req *req, *subreq;
2115 struct ctdb_transaction_handle *h;
2117 req = tevent_req_create(mem_ctx, &state,
2118 struct ctdb_transaction_start_state);
2119 if (req == NULL) {
2120 return NULL;
2123 if (ctdb_db_volatile(db)) {
2124 tevent_req_error(req, EINVAL);
2125 return tevent_req_post(req, ev);
2128 state->ev = ev;
2129 state->client = client;
2130 state->destnode = ctdb_client_pnn(client);
2132 h = talloc_zero(db, struct ctdb_transaction_handle);
2133 if (tevent_req_nomem(h, req)) {
2134 return tevent_req_post(req, ev);
2137 h->ev = ev;
2138 h->client = client;
2139 h->db = db;
2140 h->readonly = readonly;
2141 h->updated = false;
2143 /* SRVID is unique for databases, so client can have transactions
2144 * active for multiple databases */
2145 h->sid = ctdb_client_get_server_id(client, db->db_id);
2147 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2148 if (tevent_req_nomem(h->recbuf, req)) {
2149 return tevent_req_post(req, ev);
2152 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2153 if (tevent_req_nomem(h->lock_name, req)) {
2154 return tevent_req_post(req, ev);
2157 state->h = h;
2159 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2160 if (tevent_req_nomem(subreq, req)) {
2161 return tevent_req_post(req, ev);
2163 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2165 return req;
2168 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2170 struct tevent_req *req = tevent_req_callback_data(
2171 subreq, struct tevent_req);
2172 struct ctdb_transaction_start_state *state = tevent_req_data(
2173 req, struct ctdb_transaction_start_state);
2174 bool status;
2175 int ret;
2177 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2178 TALLOC_FREE(subreq);
2179 if (! status) {
2180 DEBUG(DEBUG_ERR,
2181 ("transaction_start: %s attach g_lock.tdb failed\n",
2182 state->h->db->db_name));
2183 tevent_req_error(req, ret);
2184 return;
2187 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2188 state->h->db_g_lock,
2189 state->h->lock_name,
2190 &state->h->sid, state->h->readonly);
2191 if (tevent_req_nomem(subreq, req)) {
2192 return;
2194 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2197 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2199 struct tevent_req *req = tevent_req_callback_data(
2200 subreq, struct tevent_req);
2201 struct ctdb_transaction_start_state *state = tevent_req_data(
2202 req, struct ctdb_transaction_start_state);
2203 int ret;
2204 bool status;
2206 status = ctdb_g_lock_lock_recv(subreq, &ret);
2207 TALLOC_FREE(subreq);
2208 if (! status) {
2209 DEBUG(DEBUG_ERR,
2210 ("transaction_start: %s g_lock lock failed, ret=%d\n",
2211 state->h->db->db_name, ret));
2212 tevent_req_error(req, ret);
2213 return;
2216 tevent_req_done(req);
2219 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2220 struct tevent_req *req,
2221 int *perr)
2223 struct ctdb_transaction_start_state *state = tevent_req_data(
2224 req, struct ctdb_transaction_start_state);
2225 int err;
2227 if (tevent_req_is_unix_error(req, &err)) {
2228 if (perr != NULL) {
2229 *perr = err;
2231 return NULL;
2234 return state->h;
2237 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2238 struct ctdb_client_context *client,
2239 struct timeval timeout,
2240 struct ctdb_db_context *db, bool readonly,
2241 struct ctdb_transaction_handle **out)
2243 struct tevent_req *req;
2244 struct ctdb_transaction_handle *h;
2245 int ret;
2247 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2248 readonly);
2249 if (req == NULL) {
2250 return ENOMEM;
2253 tevent_req_poll(req, ev);
2255 h = ctdb_transaction_start_recv(req, &ret);
2256 if (h == NULL) {
2257 return ret;
2260 *out = h;
2261 return 0;
2264 struct ctdb_transaction_record_fetch_state {
2265 TDB_DATA key, data;
2266 struct ctdb_ltdb_header header;
2267 bool found;
2270 static int ctdb_transaction_record_fetch_traverse(
2271 uint32_t reqid,
2272 struct ctdb_ltdb_header *nullheader,
2273 TDB_DATA key, TDB_DATA data,
2274 void *private_data)
2276 struct ctdb_transaction_record_fetch_state *state =
2277 (struct ctdb_transaction_record_fetch_state *)private_data;
2279 if (state->key.dsize == key.dsize &&
2280 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2281 int ret;
2283 ret = ctdb_ltdb_header_extract(&data, &state->header);
2284 if (ret != 0) {
2285 DEBUG(DEBUG_ERR,
2286 ("record_fetch: Failed to extract header, "
2287 "ret=%d\n", ret));
2288 return 1;
2291 state->data = data;
2292 state->found = true;
2295 return 0;
2298 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2299 TDB_DATA key,
2300 struct ctdb_ltdb_header *header,
2301 TDB_DATA *data)
2303 struct ctdb_transaction_record_fetch_state state;
2304 int ret;
2306 state.key = key;
2307 state.found = false;
2309 ret = ctdb_rec_buffer_traverse(h->recbuf,
2310 ctdb_transaction_record_fetch_traverse,
2311 &state);
2312 if (ret != 0) {
2313 return ret;
2316 if (state.found) {
2317 if (header != NULL) {
2318 *header = state.header;
2320 if (data != NULL) {
2321 *data = state.data;
2323 return 0;
2326 return ENOENT;
2329 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2330 TDB_DATA key,
2331 TALLOC_CTX *mem_ctx, TDB_DATA *data)
2333 TDB_DATA tmp_data;
2334 struct ctdb_ltdb_header header;
2335 int ret;
2337 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2338 if (ret == 0) {
2339 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2340 tmp_data.dsize);
2341 if (data->dptr == NULL) {
2342 return ENOMEM;
2344 data->dsize = tmp_data.dsize;
2345 return 0;
2348 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2349 if (ret != 0) {
2350 return ret;
2353 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2354 if (ret != 0) {
2355 return ret;
2358 return 0;
2361 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2362 TDB_DATA key, TDB_DATA data)
2364 TALLOC_CTX *tmp_ctx;
2365 struct ctdb_ltdb_header header;
2366 TDB_DATA old_data;
2367 int ret;
2369 if (h->readonly) {
2370 return EINVAL;
2373 tmp_ctx = talloc_new(h);
2374 if (tmp_ctx == NULL) {
2375 return ENOMEM;
2378 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2379 if (ret != 0) {
2380 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2381 if (ret != 0) {
2382 return ret;
2386 if (old_data.dsize == data.dsize &&
2387 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2388 talloc_free(tmp_ctx);
2389 return 0;
2392 header.dmaster = ctdb_client_pnn(h->client);
2393 header.rsn += 1;
2395 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2396 talloc_free(tmp_ctx);
2397 if (ret != 0) {
2398 return ret;
2400 h->updated = true;
2402 return 0;
2405 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2406 TDB_DATA key)
2408 return ctdb_transaction_store_record(h, key, tdb_null);
2411 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2412 uint64_t *seqnum)
2414 const char *keyname = CTDB_DB_SEQNUM_KEY;
2415 TDB_DATA key, data;
2416 struct ctdb_ltdb_header header;
2417 int ret;
2419 key.dptr = discard_const(keyname);
2420 key.dsize = strlen(keyname) + 1;
2422 ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2423 if (ret != 0) {
2424 DEBUG(DEBUG_ERR,
2425 ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2426 h->db->db_name, ret));
2427 return ret;
2430 if (data.dsize == 0) {
2431 /* initial data */
2432 *seqnum = 0;
2433 return 0;
2436 if (data.dsize != sizeof(uint64_t)) {
2437 talloc_free(data.dptr);
2438 return EINVAL;
2441 *seqnum = *(uint64_t *)data.dptr;
2443 talloc_free(data.dptr);
2444 return 0;
2447 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2448 uint64_t seqnum)
2450 const char *keyname = CTDB_DB_SEQNUM_KEY;
2451 TDB_DATA key, data;
2453 key.dptr = discard_const(keyname);
2454 key.dsize = strlen(keyname) + 1;
2456 data.dptr = (uint8_t *)&seqnum;
2457 data.dsize = sizeof(seqnum);
2459 return ctdb_transaction_store_record(h, key, data);
2462 struct ctdb_transaction_commit_state {
2463 struct tevent_context *ev;
2464 struct timeval timeout;
2465 struct ctdb_transaction_handle *h;
2466 uint64_t seqnum;
2469 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2470 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2472 struct tevent_req *ctdb_transaction_commit_send(
2473 TALLOC_CTX *mem_ctx,
2474 struct tevent_context *ev,
2475 struct timeval timeout,
2476 struct ctdb_transaction_handle *h)
2478 struct tevent_req *req, *subreq;
2479 struct ctdb_transaction_commit_state *state;
2480 struct ctdb_req_control request;
2481 int ret;
2483 req = tevent_req_create(mem_ctx, &state,
2484 struct ctdb_transaction_commit_state);
2485 if (req == NULL) {
2486 return NULL;
2489 state->ev = ev;
2490 state->timeout = timeout;
2491 state->h = h;
2493 ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2494 if (ret != 0) {
2495 tevent_req_error(req, ret);
2496 return tevent_req_post(req, ev);
2499 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2500 if (ret != 0) {
2501 tevent_req_error(req, ret);
2502 return tevent_req_post(req, ev);
2505 ctdb_req_control_trans3_commit(&request, h->recbuf);
2506 subreq = ctdb_client_control_send(state, ev, h->client,
2507 ctdb_client_pnn(h->client),
2508 timeout, &request);
2509 if (tevent_req_nomem(subreq, req)) {
2510 return tevent_req_post(req, ev);
2512 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2514 return req;
2517 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2519 struct tevent_req *req = tevent_req_callback_data(
2520 subreq, struct tevent_req);
2521 struct ctdb_transaction_commit_state *state = tevent_req_data(
2522 req, struct ctdb_transaction_commit_state);
2523 struct ctdb_transaction_handle *h = state->h;
2524 struct ctdb_reply_control *reply;
2525 uint64_t seqnum;
2526 int ret;
2527 bool status;
2529 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2530 TALLOC_FREE(subreq);
2531 if (! status) {
2532 DEBUG(DEBUG_ERR,
2533 ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2534 h->db->db_name, ret));
2535 tevent_req_error(req, ret);
2536 return;
2539 ret = ctdb_reply_control_trans3_commit(reply);
2540 talloc_free(reply);
2542 if (ret != 0) {
2543 /* Control failed due to recovery */
2545 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2546 if (ret != 0) {
2547 tevent_req_error(req, ret);
2548 return;
2551 if (seqnum == state->seqnum) {
2552 struct ctdb_req_control request;
2554 /* try again */
2555 ctdb_req_control_trans3_commit(&request,
2556 state->h->recbuf);
2557 subreq = ctdb_client_control_send(
2558 state, state->ev, state->h->client,
2559 ctdb_client_pnn(state->h->client),
2560 state->timeout, &request);
2561 if (tevent_req_nomem(subreq, req)) {
2562 return;
2564 tevent_req_set_callback(subreq,
2565 ctdb_transaction_commit_done,
2566 req);
2567 return;
2570 if (seqnum != state->seqnum + 1) {
2571 DEBUG(DEBUG_ERR,
2572 ("transaction_commit: %s seqnum mismatch "
2573 "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2574 state->h->db->db_name, seqnum, state->seqnum));
2575 tevent_req_error(req, EIO);
2576 return;
2580 /* trans3_commit successful */
2581 subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2582 h->db_g_lock, h->lock_name, h->sid);
2583 if (tevent_req_nomem(subreq, req)) {
2584 return;
2586 tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2587 req);
2590 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2592 struct tevent_req *req = tevent_req_callback_data(
2593 subreq, struct tevent_req);
2594 struct ctdb_transaction_commit_state *state = tevent_req_data(
2595 req, struct ctdb_transaction_commit_state);
2596 int ret;
2597 bool status;
2599 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2600 TALLOC_FREE(subreq);
2601 if (! status) {
2602 DEBUG(DEBUG_ERR,
2603 ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2604 state->h->db->db_name, ret));
2605 tevent_req_error(req, ret);
2606 return;
2609 talloc_free(state->h);
2610 tevent_req_done(req);
2613 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2615 int err;
2617 if (tevent_req_is_unix_error(req, &err)) {
2618 if (perr != NULL) {
2619 *perr = err;
2621 return false;
2624 return true;
2627 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2629 struct tevent_context *ev = h->ev;
2630 TALLOC_CTX *mem_ctx;
2631 struct tevent_req *req;
2632 int ret;
2633 bool status;
2635 if (h->readonly || ! h->updated) {
2636 return ctdb_transaction_cancel(h);
2639 mem_ctx = talloc_new(NULL);
2640 if (mem_ctx == NULL) {
2641 return ENOMEM;
2644 req = ctdb_transaction_commit_send(mem_ctx, ev,
2645 tevent_timeval_zero(), h);
2646 if (req == NULL) {
2647 talloc_free(mem_ctx);
2648 return ENOMEM;
2651 tevent_req_poll(req, ev);
2653 status = ctdb_transaction_commit_recv(req, &ret);
2654 if (! status) {
2655 talloc_free(mem_ctx);
2656 return ret;
2659 talloc_free(mem_ctx);
2660 return 0;
2663 struct ctdb_transaction_cancel_state {
2664 struct tevent_context *ev;
2665 struct ctdb_transaction_handle *h;
2666 struct timeval timeout;
2669 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2671 struct tevent_req *ctdb_transaction_cancel_send(
2672 TALLOC_CTX *mem_ctx,
2673 struct tevent_context *ev,
2674 struct timeval timeout,
2675 struct ctdb_transaction_handle *h)
2677 struct tevent_req *req, *subreq;
2678 struct ctdb_transaction_cancel_state *state;
2680 req = tevent_req_create(mem_ctx, &state,
2681 struct ctdb_transaction_cancel_state);
2682 if (req == NULL) {
2683 return NULL;
2686 state->ev = ev;
2687 state->h = h;
2688 state->timeout = timeout;
2690 subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2691 state->h->db_g_lock,
2692 state->h->lock_name, state->h->sid);
2693 if (tevent_req_nomem(subreq, req)) {
2694 return tevent_req_post(req, ev);
2696 tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2697 req);
2699 return req;
2702 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2704 struct tevent_req *req = tevent_req_callback_data(
2705 subreq, struct tevent_req);
2706 struct ctdb_transaction_cancel_state *state = tevent_req_data(
2707 req, struct ctdb_transaction_cancel_state);
2708 int ret;
2709 bool status;
2711 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2712 TALLOC_FREE(subreq);
2713 if (! status) {
2714 DEBUG(DEBUG_ERR,
2715 ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2716 state->h->db->db_name, ret));
2717 talloc_free(state->h);
2718 tevent_req_error(req, ret);
2719 return;
2722 talloc_free(state->h);
2723 tevent_req_done(req);
2726 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2728 int err;
2730 if (tevent_req_is_unix_error(req, &err)) {
2731 if (perr != NULL) {
2732 *perr = err;
2734 return false;
2737 return true;
2740 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2742 struct tevent_context *ev = h->ev;
2743 struct tevent_req *req;
2744 TALLOC_CTX *mem_ctx;
2745 int ret;
2746 bool status;
2748 mem_ctx = talloc_new(NULL);
2749 if (mem_ctx == NULL) {
2750 talloc_free(h);
2751 return ENOMEM;
2754 req = ctdb_transaction_cancel_send(mem_ctx, ev,
2755 tevent_timeval_zero(), h);
2756 if (req == NULL) {
2757 talloc_free(mem_ctx);
2758 talloc_free(h);
2759 return ENOMEM;
2762 tevent_req_poll(req, ev);
2764 status = ctdb_transaction_cancel_recv(req, &ret);
2765 if (! status) {
2766 talloc_free(mem_ctx);
2767 return ret;
2770 talloc_free(mem_ctx);
2771 return 0;
2775 * TODO:
2777 * In future Samba should register SERVER_ID.
2778 * Make that structure same as struct srvid {}.