s3:winbindd: Use #ifdef instead of #if for config.h definitions
[Samba.git] / ctdb / client / client_db.c
blobc0ecdeb2d307dc57b2be5ca29e71e1421b6215fb
1 /*
2 CTDB client code
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
42 const char *db_name)
44 struct ctdb_db_context *db;
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
48 return db;
52 return NULL;
55 static bool ctdb_db_persistent(struct ctdb_db_context *db)
57 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
58 return true;
60 return false;
63 static bool ctdb_db_replicated(struct ctdb_db_context *db)
65 if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
66 return true;
68 return false;
71 static bool ctdb_db_volatile(struct ctdb_db_context *db)
73 if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT ||
74 db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
75 return false;
77 return true;
80 struct ctdb_set_db_flags_state {
81 struct tevent_context *ev;
82 struct ctdb_client_context *client;
83 struct timeval timeout;
84 uint32_t db_id;
85 uint8_t db_flags;
86 bool readonly_done, sticky_done;
87 uint32_t *pnn_list;
88 int count;
91 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
92 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
93 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
95 static struct tevent_req *ctdb_set_db_flags_send(
96 TALLOC_CTX *mem_ctx,
97 struct tevent_context *ev,
98 struct ctdb_client_context *client,
99 uint32_t destnode, struct timeval timeout,
100 uint32_t db_id, uint8_t db_flags)
102 struct tevent_req *req, *subreq;
103 struct ctdb_set_db_flags_state *state;
104 struct ctdb_req_control request;
106 req = tevent_req_create(mem_ctx, &state,
107 struct ctdb_set_db_flags_state);
108 if (req == NULL) {
109 return NULL;
112 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
113 tevent_req_done(req);
114 return tevent_req_post(req, ev);
117 state->ev = ev;
118 state->client = client;
119 state->timeout = timeout;
120 state->db_id = db_id;
121 state->db_flags = db_flags;
123 ctdb_req_control_get_nodemap(&request);
124 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
125 &request);
126 if (tevent_req_nomem(subreq, req)) {
127 return tevent_req_post(req, ev);
129 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
131 return req;
134 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
136 struct tevent_req *req = tevent_req_callback_data(
137 subreq, struct tevent_req);
138 struct ctdb_set_db_flags_state *state = tevent_req_data(
139 req, struct ctdb_set_db_flags_state);
140 struct ctdb_req_control request;
141 struct ctdb_reply_control *reply;
142 struct ctdb_node_map *nodemap;
143 int ret;
144 bool status;
146 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
147 TALLOC_FREE(subreq);
148 if (! status) {
149 DEBUG(DEBUG_ERR,
150 ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
151 state->db_id, ret));
152 tevent_req_error(req, ret);
153 return;
156 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
157 talloc_free(reply);
158 if (ret != 0) {
159 DEBUG(DEBUG_ERR,
160 ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
161 state->db_id, ret));
162 tevent_req_error(req, ret);
163 return;
166 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
167 state, &state->pnn_list);
168 talloc_free(nodemap);
169 if (state->count <= 0) {
170 DEBUG(DEBUG_ERR,
171 ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
172 state->db_id, state->count));
173 tevent_req_error(req, ENOMEM);
174 return;
177 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
178 ctdb_req_control_set_db_readonly(&request, state->db_id);
179 subreq = ctdb_client_control_multi_send(
180 state, state->ev, state->client,
181 state->pnn_list, state->count,
182 state->timeout, &request);
183 if (tevent_req_nomem(subreq, req)) {
184 return;
186 tevent_req_set_callback(subreq,
187 ctdb_set_db_flags_readonly_done, req);
188 } else {
189 state->readonly_done = true;
192 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
193 ctdb_req_control_set_db_sticky(&request, state->db_id);
194 subreq = ctdb_client_control_multi_send(
195 state, state->ev, state->client,
196 state->pnn_list, state->count,
197 state->timeout, &request);
198 if (tevent_req_nomem(subreq, req)) {
199 return;
201 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
202 req);
203 } else {
204 state->sticky_done = true;
208 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
210 struct tevent_req *req = tevent_req_callback_data(
211 subreq, struct tevent_req);
212 struct ctdb_set_db_flags_state *state = tevent_req_data(
213 req, struct ctdb_set_db_flags_state);
214 int ret;
215 bool status;
217 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
218 NULL);
219 TALLOC_FREE(subreq);
220 if (! status) {
221 DEBUG(DEBUG_ERR,
222 ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
223 state->db_id, ret));
224 tevent_req_error(req, ret);
225 return;
228 state->readonly_done = true;
230 if (state->readonly_done && state->sticky_done) {
231 tevent_req_done(req);
235 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
237 struct tevent_req *req = tevent_req_callback_data(
238 subreq, struct tevent_req);
239 struct ctdb_set_db_flags_state *state = tevent_req_data(
240 req, struct ctdb_set_db_flags_state);
241 int ret;
242 bool status;
244 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
245 NULL);
246 TALLOC_FREE(subreq);
247 if (! status) {
248 DEBUG(DEBUG_ERR,
249 ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
250 state->db_id, ret));
251 tevent_req_error(req, ret);
252 return;
255 state->sticky_done = true;
257 if (state->readonly_done && state->sticky_done) {
258 tevent_req_done(req);
262 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
264 int err;
266 if (tevent_req_is_unix_error(req, &err)) {
267 if (perr != NULL) {
268 *perr = err;
270 return false;
272 return true;
275 struct ctdb_attach_state {
276 struct tevent_context *ev;
277 struct ctdb_client_context *client;
278 struct timeval timeout;
279 uint32_t destnode;
280 uint8_t db_flags;
281 struct ctdb_db_context *db;
284 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
285 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
286 static void ctdb_attach_health_done(struct tevent_req *subreq);
287 static void ctdb_attach_flags_done(struct tevent_req *subreq);
288 static void ctdb_attach_open_flags_done(struct tevent_req *subreq);
290 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
291 struct tevent_context *ev,
292 struct ctdb_client_context *client,
293 struct timeval timeout,
294 const char *db_name, uint8_t db_flags)
296 struct tevent_req *req, *subreq;
297 struct ctdb_attach_state *state;
298 struct ctdb_req_control request;
300 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
301 if (req == NULL) {
302 return NULL;
305 state->db = client_db_handle(client, db_name);
306 if (state->db != NULL) {
307 tevent_req_done(req);
308 return tevent_req_post(req, ev);
311 state->ev = ev;
312 state->client = client;
313 state->timeout = timeout;
314 state->destnode = ctdb_client_pnn(client);
315 state->db_flags = db_flags;
317 state->db = talloc_zero(client, struct ctdb_db_context);
318 if (tevent_req_nomem(state->db, req)) {
319 return tevent_req_post(req, ev);
322 state->db->db_name = talloc_strdup(state->db, db_name);
323 if (tevent_req_nomem(state->db, req)) {
324 return tevent_req_post(req, ev);
327 state->db->db_flags = db_flags;
329 if (ctdb_db_persistent(state->db)) {
330 ctdb_req_control_db_attach_persistent(&request,
331 state->db->db_name);
332 } else if (ctdb_db_replicated(state->db)) {
333 ctdb_req_control_db_attach_replicated(&request,
334 state->db->db_name);
335 } else {
336 ctdb_req_control_db_attach(&request, state->db->db_name);
339 subreq = ctdb_client_control_send(state, state->ev, state->client,
340 state->destnode, state->timeout,
341 &request);
342 if (tevent_req_nomem(subreq, req)) {
343 return tevent_req_post(req, ev);
345 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
347 return req;
350 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
352 struct tevent_req *req = tevent_req_callback_data(
353 subreq, struct tevent_req);
354 struct ctdb_attach_state *state = tevent_req_data(
355 req, struct ctdb_attach_state);
356 struct ctdb_req_control request;
357 struct ctdb_reply_control *reply;
358 bool status;
359 int ret;
361 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
362 TALLOC_FREE(subreq);
363 if (! status) {
364 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
365 state->db->db_name,
366 (ctdb_db_persistent(state->db)
367 ? "DB_ATTACH_PERSISTENT"
368 : (ctdb_db_replicated(state->db)
369 ? "DB_ATTACH_REPLICATED"
370 : "DB_ATTACH")),
371 ret));
372 tevent_req_error(req, ret);
373 return;
376 if (ctdb_db_persistent(state->db)) {
377 ret = ctdb_reply_control_db_attach_persistent(
378 reply, &state->db->db_id);
379 } else if (ctdb_db_replicated(state->db)) {
380 ret = ctdb_reply_control_db_attach_replicated(
381 reply, &state->db->db_id);
382 } else {
383 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
385 talloc_free(reply);
386 if (ret != 0) {
387 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
388 state->db->db_name, ret));
389 tevent_req_error(req, ret);
390 return;
393 ctdb_req_control_getdbpath(&request, state->db->db_id);
394 subreq = ctdb_client_control_send(state, state->ev, state->client,
395 state->destnode, state->timeout,
396 &request);
397 if (tevent_req_nomem(subreq, req)) {
398 return;
400 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
403 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
405 struct tevent_req *req = tevent_req_callback_data(
406 subreq, struct tevent_req);
407 struct ctdb_attach_state *state = tevent_req_data(
408 req, struct ctdb_attach_state);
409 struct ctdb_reply_control *reply;
410 struct ctdb_req_control request;
411 bool status;
412 int ret;
414 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
415 TALLOC_FREE(subreq);
416 if (! status) {
417 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
418 state->db->db_name, ret));
419 tevent_req_error(req, ret);
420 return;
423 ret = ctdb_reply_control_getdbpath(reply, state->db,
424 &state->db->db_path);
425 talloc_free(reply);
426 if (ret != 0) {
427 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
428 state->db->db_name, ret));
429 tevent_req_error(req, ret);
430 return;
433 ctdb_req_control_db_get_health(&request, state->db->db_id);
434 subreq = ctdb_client_control_send(state, state->ev, state->client,
435 state->destnode, state->timeout,
436 &request);
437 if (tevent_req_nomem(subreq, req)) {
438 return;
440 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
443 static void ctdb_attach_health_done(struct tevent_req *subreq)
445 struct tevent_req *req = tevent_req_callback_data(
446 subreq, struct tevent_req);
447 struct ctdb_attach_state *state = tevent_req_data(
448 req, struct ctdb_attach_state);
449 struct ctdb_reply_control *reply;
450 const char *reason;
451 bool status;
452 int ret;
454 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
455 TALLOC_FREE(subreq);
456 if (! status) {
457 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
458 state->db->db_name, ret));
459 tevent_req_error(req, ret);
460 return;
463 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
464 if (ret != 0) {
465 DEBUG(DEBUG_ERR,
466 ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
467 state->db->db_name, ret));
468 tevent_req_error(req, ret);
469 return;
472 if (reason != NULL) {
473 /* Database unhealthy, avoid attach */
474 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
475 state->db->db_name, reason));
476 tevent_req_error(req, EIO);
477 return;
480 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
481 state->destnode, state->timeout,
482 state->db->db_id, state->db_flags);
483 if (tevent_req_nomem(subreq, req)) {
484 return;
486 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
489 static void ctdb_attach_flags_done(struct tevent_req *subreq)
491 struct tevent_req *req = tevent_req_callback_data(
492 subreq, struct tevent_req);
493 struct ctdb_attach_state *state = tevent_req_data(
494 req, struct ctdb_attach_state);
495 struct ctdb_req_control request;
496 bool status;
497 int ret;
499 status = ctdb_set_db_flags_recv(subreq, &ret);
500 TALLOC_FREE(subreq);
501 if (! status) {
502 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
503 state->db->db_name, state->db_flags));
504 tevent_req_error(req, ret);
505 return;
508 ctdb_req_control_db_open_flags(&request, state->db->db_id);
509 subreq = ctdb_client_control_send(state, state->ev, state->client,
510 state->destnode, state->timeout,
511 &request);
512 if (tevent_req_nomem(subreq, req)) {
513 return;
515 tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req);
518 static void ctdb_attach_open_flags_done(struct tevent_req *subreq)
520 struct tevent_req *req = tevent_req_callback_data(
521 subreq, struct tevent_req);
522 struct ctdb_attach_state *state = tevent_req_data(
523 req, struct ctdb_attach_state);
524 struct ctdb_reply_control *reply;
525 bool status;
526 int ret, tdb_flags;
528 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
529 TALLOC_FREE(subreq);
530 if (! status) {
531 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
532 state->db->db_name, ret));
533 tevent_req_error(req, ret);
534 return;
537 ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags);
538 talloc_free(reply);
539 if (ret != 0) {
540 DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed,"
541 " ret=%d\n", state->db->db_name, ret));
542 tevent_req_error(req, ret);
543 return;
546 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
547 tdb_flags, O_RDWR, 0);
548 if (tevent_req_nomem(state->db->ltdb, req)) {
549 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
550 state->db->db_name));
551 return;
553 DLIST_ADD(state->client->db, state->db);
555 tevent_req_done(req);
558 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
559 struct ctdb_db_context **out)
561 struct ctdb_attach_state *state = tevent_req_data(
562 req, struct ctdb_attach_state);
563 int err;
565 if (tevent_req_is_unix_error(req, &err)) {
566 if (perr != NULL) {
567 *perr = err;
569 return false;
572 if (out != NULL) {
573 *out = state->db;
575 return true;
578 int ctdb_attach(struct tevent_context *ev,
579 struct ctdb_client_context *client,
580 struct timeval timeout,
581 const char *db_name, uint8_t db_flags,
582 struct ctdb_db_context **out)
584 TALLOC_CTX *mem_ctx;
585 struct tevent_req *req;
586 bool status;
587 int ret;
589 mem_ctx = talloc_new(client);
590 if (mem_ctx == NULL) {
591 return ENOMEM;
594 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
595 db_name, db_flags);
596 if (req == NULL) {
597 talloc_free(mem_ctx);
598 return ENOMEM;
601 tevent_req_poll(req, ev);
603 status = ctdb_attach_recv(req, &ret, out);
604 if (! status) {
605 talloc_free(mem_ctx);
606 return ret;
610 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
611 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
612 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
615 talloc_free(mem_ctx);
616 return 0;
619 struct ctdb_detach_state {
620 struct ctdb_client_context *client;
621 struct tevent_context *ev;
622 struct timeval timeout;
623 uint32_t db_id;
624 const char *db_name;
627 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
628 static void ctdb_detach_done(struct tevent_req *subreq);
630 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
631 struct tevent_context *ev,
632 struct ctdb_client_context *client,
633 struct timeval timeout, uint32_t db_id)
635 struct tevent_req *req, *subreq;
636 struct ctdb_detach_state *state;
637 struct ctdb_req_control request;
639 req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
640 if (req == NULL) {
641 return NULL;
644 state->client = client;
645 state->ev = ev;
646 state->timeout = timeout;
647 state->db_id = db_id;
649 ctdb_req_control_get_dbname(&request, db_id);
650 subreq = ctdb_client_control_send(state, ev, client,
651 ctdb_client_pnn(client), timeout,
652 &request);
653 if (tevent_req_nomem(subreq, req)) {
654 return tevent_req_post(req, ev);
656 tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
658 return req;
661 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
663 struct tevent_req *req = tevent_req_callback_data(
664 subreq, struct tevent_req);
665 struct ctdb_detach_state *state = tevent_req_data(
666 req, struct ctdb_detach_state);
667 struct ctdb_reply_control *reply;
668 struct ctdb_req_control request;
669 int ret;
670 bool status;
672 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
673 TALLOC_FREE(subreq);
674 if (! status) {
675 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
676 state->db_id, ret));
677 tevent_req_error(req, ret);
678 return;
681 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
682 if (ret != 0) {
683 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
684 state->db_id, ret));
685 tevent_req_error(req, ret);
686 return;
689 ctdb_req_control_db_detach(&request, state->db_id);
690 subreq = ctdb_client_control_send(state, state->ev, state->client,
691 ctdb_client_pnn(state->client),
692 state->timeout, &request);
693 if (tevent_req_nomem(subreq, req)) {
694 return;
696 tevent_req_set_callback(subreq, ctdb_detach_done, req);
700 static void ctdb_detach_done(struct tevent_req *subreq)
702 struct tevent_req *req = tevent_req_callback_data(
703 subreq, struct tevent_req);
704 struct ctdb_detach_state *state = tevent_req_data(
705 req, struct ctdb_detach_state);
706 struct ctdb_reply_control *reply;
707 struct ctdb_db_context *db;
708 int ret;
709 bool status;
711 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
712 TALLOC_FREE(subreq);
713 if (! status) {
714 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
715 state->db_name, ret));
716 tevent_req_error(req, ret);
717 return;
720 ret = ctdb_reply_control_db_detach(reply);
721 if (ret != 0) {
722 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
723 state->db_name, ret));
724 tevent_req_error(req, ret);
725 return;
728 db = client_db_handle(state->client, state->db_name);
729 if (db != NULL) {
730 DLIST_REMOVE(state->client->db, db);
731 TALLOC_FREE(db);
734 tevent_req_done(req);
737 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
739 int ret;
741 if (tevent_req_is_unix_error(req, &ret)) {
742 if (perr != NULL) {
743 *perr = ret;
745 return false;
748 return true;
751 int ctdb_detach(struct tevent_context *ev,
752 struct ctdb_client_context *client,
753 struct timeval timeout, uint32_t db_id)
755 TALLOC_CTX *mem_ctx;
756 struct tevent_req *req;
757 int ret;
758 bool status;
760 mem_ctx = talloc_new(client);
761 if (mem_ctx == NULL) {
762 return ENOMEM;
765 req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
766 if (req == NULL) {
767 talloc_free(mem_ctx);
768 return ENOMEM;
771 tevent_req_poll(req, ev);
773 status = ctdb_detach_recv(req, &ret);
774 if (! status) {
775 talloc_free(mem_ctx);
776 return ret;
779 talloc_free(mem_ctx);
780 return 0;
783 uint32_t ctdb_db_id(struct ctdb_db_context *db)
785 return db->db_id;
788 struct ctdb_db_traverse_local_state {
789 ctdb_rec_parser_func_t parser;
790 void *private_data;
791 bool extract_header;
792 int error;
795 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
796 TDB_DATA key, TDB_DATA data,
797 void *private_data)
799 struct ctdb_db_traverse_local_state *state =
800 (struct ctdb_db_traverse_local_state *)private_data;
801 int ret;
803 if (state->extract_header) {
804 struct ctdb_ltdb_header header;
806 ret = ctdb_ltdb_header_extract(&data, &header);
807 if (ret != 0) {
808 state->error = ret;
809 return 1;
812 ret = state->parser(0, &header, key, data, state->private_data);
813 } else {
814 ret = state->parser(0, NULL, key, data, state->private_data);
817 if (ret != 0) {
818 state->error = ret;
819 return 1;
822 return 0;
825 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
826 bool extract_header,
827 ctdb_rec_parser_func_t parser, void *private_data)
829 struct ctdb_db_traverse_local_state state;
830 int ret;
832 state.parser = parser;
833 state.private_data = private_data;
834 state.extract_header = extract_header;
835 state.error = 0;
837 if (readonly) {
838 ret = tdb_traverse_read(db->ltdb->tdb,
839 ctdb_db_traverse_local_handler,
840 &state);
841 } else {
842 ret = tdb_traverse(db->ltdb->tdb,
843 ctdb_db_traverse_local_handler, &state);
846 if (ret == -1) {
847 return EIO;
850 return state.error;
853 struct ctdb_db_traverse_state {
854 struct tevent_context *ev;
855 struct ctdb_client_context *client;
856 struct ctdb_db_context *db;
857 uint32_t destnode;
858 uint64_t srvid;
859 struct timeval timeout;
860 ctdb_rec_parser_func_t parser;
861 void *private_data;
862 int result;
865 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
866 static void ctdb_db_traverse_started(struct tevent_req *subreq);
867 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
868 void *private_data);
869 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
870 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
872 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
873 struct tevent_context *ev,
874 struct ctdb_client_context *client,
875 struct ctdb_db_context *db,
876 uint32_t destnode,
877 struct timeval timeout,
878 ctdb_rec_parser_func_t parser,
879 void *private_data)
881 struct tevent_req *req, *subreq;
882 struct ctdb_db_traverse_state *state;
884 req = tevent_req_create(mem_ctx, &state,
885 struct ctdb_db_traverse_state);
886 if (req == NULL) {
887 return NULL;
890 state->ev = ev;
891 state->client = client;
892 state->db = db;
893 state->destnode = destnode;
894 state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
895 state->timeout = timeout;
896 state->parser = parser;
897 state->private_data = private_data;
899 subreq = ctdb_client_set_message_handler_send(state, ev, client,
900 state->srvid,
901 ctdb_db_traverse_handler,
902 req);
903 if (tevent_req_nomem(subreq, req)) {
904 return tevent_req_post(req, ev);
906 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
908 return req;
911 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
913 struct tevent_req *req = tevent_req_callback_data(
914 subreq, struct tevent_req);
915 struct ctdb_db_traverse_state *state = tevent_req_data(
916 req, struct ctdb_db_traverse_state);
917 struct ctdb_traverse_start_ext traverse;
918 struct ctdb_req_control request;
919 int ret = 0;
920 bool status;
922 status = ctdb_client_set_message_handler_recv(subreq, &ret);
923 TALLOC_FREE(subreq);
924 if (! status) {
925 tevent_req_error(req, ret);
926 return;
929 traverse = (struct ctdb_traverse_start_ext) {
930 .db_id = ctdb_db_id(state->db),
931 .reqid = 0,
932 .srvid = state->srvid,
933 .withemptyrecords = false,
936 ctdb_req_control_traverse_start_ext(&request, &traverse);
937 subreq = ctdb_client_control_send(state, state->ev, state->client,
938 state->destnode, state->timeout,
939 &request);
940 if (subreq == NULL) {
941 state->result = ENOMEM;
942 ctdb_db_traverse_remove_handler(req);
943 return;
945 tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
948 static void ctdb_db_traverse_started(struct tevent_req *subreq)
950 struct tevent_req *req = tevent_req_callback_data(
951 subreq, struct tevent_req);
952 struct ctdb_db_traverse_state *state = tevent_req_data(
953 req, struct ctdb_db_traverse_state);
954 struct ctdb_reply_control *reply;
955 int ret = 0;
956 bool status;
958 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
959 TALLOC_FREE(subreq);
960 if (! status) {
961 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
962 state->result = ret;
963 ctdb_db_traverse_remove_handler(req);
964 return;
967 ret = ctdb_reply_control_traverse_start_ext(reply);
968 talloc_free(reply);
969 if (ret != 0) {
970 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
971 ret));
972 state->result = ret;
973 ctdb_db_traverse_remove_handler(req);
974 return;
978 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
979 void *private_data)
981 struct tevent_req *req = talloc_get_type_abort(
982 private_data, struct tevent_req);
983 struct ctdb_db_traverse_state *state = tevent_req_data(
984 req, struct ctdb_db_traverse_state);
985 struct ctdb_rec_data *rec;
986 struct ctdb_ltdb_header header;
987 size_t np;
988 int ret;
990 ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np);
991 if (ret != 0) {
992 return;
995 if (rec->key.dsize == 0 && rec->data.dsize == 0) {
996 talloc_free(rec);
997 ctdb_db_traverse_remove_handler(req);
998 return;
1001 ret = ctdb_ltdb_header_extract(&rec->data, &header);
1002 if (ret != 0) {
1003 talloc_free(rec);
1004 return;
1007 if (rec->data.dsize == 0) {
1008 talloc_free(rec);
1009 return;
1012 ret = state->parser(rec->reqid, &header, rec->key, rec->data,
1013 state->private_data);
1014 talloc_free(rec);
1015 if (ret != 0) {
1016 state->result = ret;
1017 ctdb_db_traverse_remove_handler(req);
1021 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
1023 struct ctdb_db_traverse_state *state = tevent_req_data(
1024 req, struct ctdb_db_traverse_state);
1025 struct tevent_req *subreq;
1027 subreq = ctdb_client_remove_message_handler_send(state, state->ev,
1028 state->client,
1029 state->srvid, req);
1030 if (tevent_req_nomem(subreq, req)) {
1031 return;
1033 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1036 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1038 struct tevent_req *req = tevent_req_callback_data(
1039 subreq, struct tevent_req);
1040 struct ctdb_db_traverse_state *state = tevent_req_data(
1041 req, struct ctdb_db_traverse_state);
1042 int ret;
1043 bool status;
1045 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1046 TALLOC_FREE(subreq);
1047 if (! status) {
1048 tevent_req_error(req, ret);
1049 return;
1052 if (state->result != 0) {
1053 tevent_req_error(req, state->result);
1054 return;
1057 tevent_req_done(req);
1060 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1062 int ret;
1064 if (tevent_req_is_unix_error(req, &ret)) {
1065 if (perr != NULL) {
1066 *perr = ret;
1068 return false;
1071 return true;
1074 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1075 struct ctdb_client_context *client,
1076 struct ctdb_db_context *db,
1077 uint32_t destnode, struct timeval timeout,
1078 ctdb_rec_parser_func_t parser, void *private_data)
1080 struct tevent_req *req;
1081 int ret = 0;
1082 bool status;
1084 req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1085 timeout, parser, private_data);
1086 if (req == NULL) {
1087 return ENOMEM;
1090 tevent_req_poll(req, ev);
1092 status = ctdb_db_traverse_recv(req, &ret);
1093 if (! status) {
1094 return ret;
1097 return 0;
1100 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1101 struct ctdb_ltdb_header *header,
1102 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1104 TDB_DATA rec;
1105 size_t np;
1106 int ret;
1108 rec = tdb_fetch(db->ltdb->tdb, key);
1109 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1110 /* No record present */
1111 if (rec.dptr != NULL) {
1112 free(rec.dptr);
1115 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
1116 return EIO;
1119 *header = (struct ctdb_ltdb_header) {
1120 .dmaster = CTDB_UNKNOWN_PNN,
1123 if (data != NULL) {
1124 *data = tdb_null;
1126 return 0;
1129 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
1130 if (ret != 0) {
1131 return ret;
1134 ret = 0;
1135 if (data != NULL) {
1136 data->dsize = rec.dsize - np;
1137 data->dptr = talloc_memdup(mem_ctx, rec.dptr + np,
1138 data->dsize);
1139 if (data->dptr == NULL) {
1140 ret = ENOMEM;
1144 free(rec.dptr);
1145 return ret;
1149 * Fetch a record from volatile database
1151 * Steps:
1152 * 1. Get a lock on the hash chain
1153 * 2. If the record does not exist, migrate the record
1154 * 3. If readonly=true and delegations do not exist, migrate the record.
1155 * 4. If readonly=false and delegations exist, migrate the record.
1156 * 5. If the local node is not dmaster, migrate the record.
1157 * 6. Return record
1160 struct ctdb_fetch_lock_state {
1161 struct tevent_context *ev;
1162 struct ctdb_client_context *client;
1163 struct ctdb_record_handle *h;
1164 bool readonly;
1165 uint32_t pnn;
1168 static int ctdb_fetch_lock_check(struct tevent_req *req);
1169 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1170 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1172 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1173 struct tevent_context *ev,
1174 struct ctdb_client_context *client,
1175 struct ctdb_db_context *db,
1176 TDB_DATA key, bool readonly)
1178 struct ctdb_fetch_lock_state *state;
1179 struct tevent_req *req;
1180 int ret;
1182 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1183 if (req == NULL) {
1184 return NULL;
1187 state->ev = ev;
1188 state->client = client;
1190 state->h = talloc_zero(db, struct ctdb_record_handle);
1191 if (tevent_req_nomem(state->h, req)) {
1192 return tevent_req_post(req, ev);
1194 state->h->ev = ev;
1195 state->h->client = client;
1196 state->h->db = db;
1197 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1198 if (tevent_req_nomem(state->h->key.dptr, req)) {
1199 return tevent_req_post(req, ev);
1201 state->h->key.dsize = key.dsize;
1202 state->h->readonly = false;
1204 state->readonly = readonly;
1205 state->pnn = ctdb_client_pnn(client);
1207 /* Check that database is not persistent */
1208 if (! ctdb_db_volatile(db)) {
1209 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1210 db->db_name));
1211 tevent_req_error(req, EINVAL);
1212 return tevent_req_post(req, ev);
1215 ret = ctdb_fetch_lock_check(req);
1216 if (ret == 0) {
1217 tevent_req_done(req);
1218 return tevent_req_post(req, ev);
1220 if (ret != EAGAIN) {
1221 tevent_req_error(req, ret);
1222 return tevent_req_post(req, ev);
1224 return req;
1227 static int ctdb_fetch_lock_check(struct tevent_req *req)
1229 struct ctdb_fetch_lock_state *state = tevent_req_data(
1230 req, struct ctdb_fetch_lock_state);
1231 struct ctdb_record_handle *h = state->h;
1232 struct ctdb_ltdb_header header;
1233 TDB_DATA data = tdb_null;
1234 size_t np;
1235 int ret, err = 0;
1236 bool do_migrate = false;
1238 ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1239 if (ret != 0) {
1240 DEBUG(DEBUG_ERR,
1241 ("fetch_lock: %s tdb_chainlock failed, %s\n",
1242 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1243 err = EIO;
1244 goto failed;
1247 data = tdb_fetch(h->db->ltdb->tdb, h->key);
1248 if (data.dptr == NULL) {
1249 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1250 goto migrate;
1251 } else {
1252 err = EIO;
1253 goto failed;
1257 /* Got the record */
1258 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np);
1259 if (ret != 0) {
1260 err = ret;
1261 goto failed;
1264 if (! state->readonly) {
1265 /* Read/write access */
1266 if (header.dmaster == state->pnn &&
1267 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1268 goto migrate;
1271 if (header.dmaster != state->pnn) {
1272 goto migrate;
1274 } else {
1275 /* Readonly access */
1276 if (header.dmaster != state->pnn &&
1277 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1278 CTDB_REC_RO_HAVE_DELEGATIONS))) {
1279 goto migrate;
1283 /* We are the dmaster or readonly delegation */
1284 h->header = header;
1285 h->data = data;
1286 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1287 CTDB_REC_RO_HAVE_DELEGATIONS)) {
1288 h->readonly = true;
1290 return 0;
1292 migrate:
1293 do_migrate = true;
1294 err = EAGAIN;
1296 failed:
1297 if (data.dptr != NULL) {
1298 free(data.dptr);
1300 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1301 if (ret != 0) {
1302 DEBUG(DEBUG_ERR,
1303 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1304 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1305 return EIO;
1308 if (do_migrate) {
1309 ctdb_fetch_lock_migrate(req);
1311 return err;
1314 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1316 struct ctdb_fetch_lock_state *state = tevent_req_data(
1317 req, struct ctdb_fetch_lock_state);
1318 struct ctdb_req_call request;
1319 struct tevent_req *subreq;
1321 ZERO_STRUCT(request);
1322 request.flags = CTDB_IMMEDIATE_MIGRATION;
1323 if (state->readonly) {
1324 request.flags |= CTDB_WANT_READONLY;
1326 request.db_id = state->h->db->db_id;
1327 request.callid = CTDB_NULL_FUNC;
1328 request.key = state->h->key;
1329 request.calldata = tdb_null;
1331 subreq = ctdb_client_call_send(state, state->ev, state->client,
1332 &request);
1333 if (tevent_req_nomem(subreq, req)) {
1334 return;
1337 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1340 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1342 struct tevent_req *req = tevent_req_callback_data(
1343 subreq, struct tevent_req);
1344 struct ctdb_fetch_lock_state *state = tevent_req_data(
1345 req, struct ctdb_fetch_lock_state);
1346 struct ctdb_reply_call *reply;
1347 int ret;
1348 bool status;
1350 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1351 TALLOC_FREE(subreq);
1352 if (! status) {
1353 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1354 state->h->db->db_name, ret));
1355 tevent_req_error(req, ret);
1356 return;
1359 if (reply->status != 0) {
1360 tevent_req_error(req, EIO);
1361 return;
1363 talloc_free(reply);
1365 ret = ctdb_fetch_lock_check(req);
1366 if (ret != 0) {
1367 if (ret != EAGAIN) {
1368 tevent_req_error(req, ret);
1370 return;
1373 tevent_req_done(req);
1376 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1378 int ret;
1380 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1381 if (ret != 0) {
1382 DEBUG(DEBUG_ERR,
1383 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1384 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1386 free(h->data.dptr);
1387 return 0;
1390 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1391 struct ctdb_ltdb_header *header,
1392 TALLOC_CTX *mem_ctx,
1393 TDB_DATA *data, int *perr)
1395 struct ctdb_fetch_lock_state *state = tevent_req_data(
1396 req, struct ctdb_fetch_lock_state);
1397 struct ctdb_record_handle *h = state->h;
1398 int err;
1400 if (tevent_req_is_unix_error(req, &err)) {
1401 if (perr != NULL) {
1402 TALLOC_FREE(state->h);
1403 *perr = err;
1405 return NULL;
1408 if (header != NULL) {
1409 *header = h->header;
1411 if (data != NULL) {
1412 size_t offset;
1414 offset = ctdb_ltdb_header_len(&h->header);
1416 data->dsize = h->data.dsize - offset;
1417 if (data->dsize == 0) {
1418 data->dptr = NULL;
1419 } else {
1420 data->dptr = talloc_memdup(mem_ctx,
1421 h->data.dptr + offset,
1422 data->dsize);
1423 if (data->dptr == NULL) {
1424 TALLOC_FREE(state->h);
1425 if (perr != NULL) {
1426 *perr = ENOMEM;
1428 return NULL;
1433 talloc_set_destructor(h, ctdb_record_handle_destructor);
1434 return h;
1437 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1438 struct ctdb_client_context *client,
1439 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1440 struct ctdb_record_handle **out,
1441 struct ctdb_ltdb_header *header, TDB_DATA *data)
1443 struct tevent_req *req;
1444 struct ctdb_record_handle *h;
1445 int ret;
1447 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1448 if (req == NULL) {
1449 return ENOMEM;
1452 tevent_req_poll(req, ev);
1454 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1455 if (h == NULL) {
1456 return ret;
1459 *out = h;
1460 return 0;
1463 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1465 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1466 TDB_DATA rec[2];
1467 size_t np;
1468 int ret;
1470 /* Cannot modify the record if it was obtained as a readonly copy */
1471 if (h->readonly) {
1472 return EINVAL;
1475 /* Check if the new data is same */
1476 if (h->data.dsize == data.dsize &&
1477 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1478 /* No need to do anything */
1479 return 0;
1482 ctdb_ltdb_header_push(&h->header, header, &np);
1484 rec[0].dsize = np;
1485 rec[0].dptr = header;
1487 rec[1].dsize = data.dsize;
1488 rec[1].dptr = data.dptr;
1490 ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1491 if (ret != 0) {
1492 DEBUG(DEBUG_ERR,
1493 ("store_record: %s tdb_storev failed, %s\n",
1494 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1495 return EIO;
1498 return 0;
1501 struct ctdb_delete_record_state {
1502 struct ctdb_record_handle *h;
1505 static void ctdb_delete_record_done(struct tevent_req *subreq);
1507 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1508 struct tevent_context *ev,
1509 struct ctdb_record_handle *h)
1511 struct tevent_req *req, *subreq;
1512 struct ctdb_delete_record_state *state;
1513 struct ctdb_key_data key;
1514 struct ctdb_req_control request;
1515 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1516 TDB_DATA rec;
1517 size_t np;
1518 int ret;
1520 req = tevent_req_create(mem_ctx, &state,
1521 struct ctdb_delete_record_state);
1522 if (req == NULL) {
1523 return NULL;
1526 state->h = h;
1528 /* Cannot delete the record if it was obtained as a readonly copy */
1529 if (h->readonly) {
1530 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1531 h->db->db_name));
1532 tevent_req_error(req, EINVAL);
1533 return tevent_req_post(req, ev);
1536 ctdb_ltdb_header_push(&h->header, header, &np);
1538 rec.dsize = np;
1539 rec.dptr = header;
1541 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1542 if (ret != 0) {
1543 DEBUG(DEBUG_ERR,
1544 ("fetch_lock delete: %s tdb_sore failed, %s\n",
1545 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1546 tevent_req_error(req, EIO);
1547 return tevent_req_post(req, ev);
1550 key.db_id = h->db->db_id;
1551 key.header = h->header;
1552 key.key = h->key;
1554 ctdb_req_control_schedule_for_deletion(&request, &key);
1555 subreq = ctdb_client_control_send(state, ev, h->client,
1556 ctdb_client_pnn(h->client),
1557 tevent_timeval_zero(),
1558 &request);
1559 if (tevent_req_nomem(subreq, req)) {
1560 return tevent_req_post(req, ev);
1562 tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1564 return req;
1567 static void ctdb_delete_record_done(struct tevent_req *subreq)
1569 struct tevent_req *req = tevent_req_callback_data(
1570 subreq, struct tevent_req);
1571 struct ctdb_delete_record_state *state = tevent_req_data(
1572 req, struct ctdb_delete_record_state);
1573 int ret;
1574 bool status;
1576 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1577 TALLOC_FREE(subreq);
1578 if (! status) {
1579 DEBUG(DEBUG_ERR,
1580 ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1581 "ret=%d\n", state->h->db->db_name, ret));
1582 tevent_req_error(req, ret);
1583 return;
1586 tevent_req_done(req);
1589 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1591 int err;
1593 if (tevent_req_is_unix_error(req, &err)) {
1594 if (perr != NULL) {
1595 *perr = err;
1597 return false;
1600 return true;
1604 int ctdb_delete_record(struct ctdb_record_handle *h)
1606 struct tevent_context *ev = h->ev;
1607 TALLOC_CTX *mem_ctx;
1608 struct tevent_req *req;
1609 int ret;
1610 bool status;
1612 mem_ctx = talloc_new(NULL);
1613 if (mem_ctx == NULL) {
1614 return ENOMEM;
1617 req = ctdb_delete_record_send(mem_ctx, ev, h);
1618 if (req == NULL) {
1619 talloc_free(mem_ctx);
1620 return ENOMEM;
1623 tevent_req_poll(req, ev);
1625 status = ctdb_delete_record_recv(req, &ret);
1626 talloc_free(mem_ctx);
1627 if (! status) {
1628 return ret;
1631 return 0;
1635 * Global lock functions
1638 struct ctdb_g_lock_lock_state {
1639 struct tevent_context *ev;
1640 struct ctdb_client_context *client;
1641 struct ctdb_db_context *db;
1642 TDB_DATA key;
1643 struct ctdb_server_id my_sid;
1644 enum ctdb_g_lock_type lock_type;
1645 struct ctdb_record_handle *h;
1646 /* state for verification of active locks */
1647 struct ctdb_g_lock_list *lock_list;
1648 unsigned int current;
1651 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1652 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1653 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1654 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1655 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1657 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1658 enum ctdb_g_lock_type l2)
1660 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1661 return false;
1663 return true;
1666 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1667 struct tevent_context *ev,
1668 struct ctdb_client_context *client,
1669 struct ctdb_db_context *db,
1670 const char *keyname,
1671 struct ctdb_server_id *sid,
1672 bool readonly)
1674 struct tevent_req *req, *subreq;
1675 struct ctdb_g_lock_lock_state *state;
1677 req = tevent_req_create(mem_ctx, &state,
1678 struct ctdb_g_lock_lock_state);
1679 if (req == NULL) {
1680 return NULL;
1683 state->ev = ev;
1684 state->client = client;
1685 state->db = db;
1686 state->key.dptr = discard_const(keyname);
1687 state->key.dsize = strlen(keyname) + 1;
1688 state->my_sid = *sid;
1689 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1691 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1692 false);
1693 if (tevent_req_nomem(subreq, req)) {
1694 return tevent_req_post(req, ev);
1696 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1698 return req;
1701 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1703 struct tevent_req *req = tevent_req_callback_data(
1704 subreq, struct tevent_req);
1705 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1706 req, struct ctdb_g_lock_lock_state);
1707 TDB_DATA data;
1708 size_t np;
1709 int ret = 0;
1711 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1712 TALLOC_FREE(subreq);
1713 if (state->h == NULL) {
1714 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1715 (char *)state->key.dptr));
1716 tevent_req_error(req, ret);
1717 return;
1720 if (state->lock_list != NULL) {
1721 TALLOC_FREE(state->lock_list);
1722 state->current = 0;
1725 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1726 &state->lock_list, &np);
1727 talloc_free(data.dptr);
1728 if (ret != 0) {
1729 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1730 (char *)state->key.dptr));
1731 tevent_req_error(req, ret);
1732 return;
1735 ctdb_g_lock_lock_process_locks(req);
1738 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1740 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1741 req, struct ctdb_g_lock_lock_state);
1742 struct tevent_req *subreq;
1743 struct ctdb_g_lock *lock;
1744 bool check_server = false;
1745 int ret;
1747 while (state->current < state->lock_list->num) {
1748 lock = &state->lock_list->lock[state->current];
1750 /* We should not ask for the same lock more than once */
1751 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1752 DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1753 (char *)state->key.dptr));
1754 tevent_req_error(req, EDEADLK);
1755 return;
1758 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1759 check_server = true;
1760 break;
1763 state->current += 1;
1766 if (check_server) {
1767 struct ctdb_req_control request;
1769 ctdb_req_control_process_exists(&request, lock->sid.pid);
1770 subreq = ctdb_client_control_send(state, state->ev,
1771 state->client,
1772 lock->sid.vnn,
1773 tevent_timeval_zero(),
1774 &request);
1775 if (tevent_req_nomem(subreq, req)) {
1776 return;
1778 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1779 return;
1782 /* There is no conflict, add ourself to the lock_list */
1783 state->lock_list->lock = talloc_realloc(state->lock_list,
1784 state->lock_list->lock,
1785 struct ctdb_g_lock,
1786 state->lock_list->num + 1);
1787 if (state->lock_list->lock == NULL) {
1788 tevent_req_error(req, ENOMEM);
1789 return;
1792 lock = &state->lock_list->lock[state->lock_list->num];
1793 lock->type = state->lock_type;
1794 lock->sid = state->my_sid;
1795 state->lock_list->num += 1;
1797 ret = ctdb_g_lock_lock_update(req);
1798 if (ret != 0) {
1799 tevent_req_error(req, ret);
1800 return;
1803 TALLOC_FREE(state->h);
1804 tevent_req_done(req);
1807 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1809 struct tevent_req *req = tevent_req_callback_data(
1810 subreq, struct tevent_req);
1811 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1812 req, struct ctdb_g_lock_lock_state);
1813 struct ctdb_reply_control *reply;
1814 int ret, value;
1815 bool status;
1817 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1818 TALLOC_FREE(subreq);
1819 if (! status) {
1820 DEBUG(DEBUG_ERR,
1821 ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1822 (char *)state->key.dptr, ret));
1823 tevent_req_error(req, ret);
1824 return;
1827 ret = ctdb_reply_control_process_exists(reply, &value);
1828 if (ret != 0) {
1829 tevent_req_error(req, ret);
1830 return;
1832 talloc_free(reply);
1834 if (value == 0) {
1835 /* server process exists, need to retry */
1836 TALLOC_FREE(state->h);
1837 subreq = tevent_wakeup_send(state, state->ev,
1838 tevent_timeval_current_ofs(0,1000));
1839 if (tevent_req_nomem(subreq, req)) {
1840 return;
1842 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1843 return;
1846 /* server process does not exist, remove conflicting entry */
1847 state->lock_list->lock[state->current] =
1848 state->lock_list->lock[state->lock_list->num-1];
1849 state->lock_list->num -= 1;
1851 ret = ctdb_g_lock_lock_update(req);
1852 if (ret != 0) {
1853 tevent_req_error(req, ret);
1854 return;
1857 ctdb_g_lock_lock_process_locks(req);
1860 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1862 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1863 req, struct ctdb_g_lock_lock_state);
1864 TDB_DATA data;
1865 size_t np;
1866 int ret;
1868 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1869 data.dptr = talloc_size(state, data.dsize);
1870 if (data.dptr == NULL) {
1871 return ENOMEM;
1874 ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
1875 ret = ctdb_store_record(state->h, data);
1876 talloc_free(data.dptr);
1877 return ret;
1880 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1882 struct tevent_req *req = tevent_req_callback_data(
1883 subreq, struct tevent_req);
1884 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1885 req, struct ctdb_g_lock_lock_state);
1886 bool success;
1888 success = tevent_wakeup_recv(subreq);
1889 TALLOC_FREE(subreq);
1890 if (! success) {
1891 tevent_req_error(req, ENOMEM);
1892 return;
1895 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1896 state->db, state->key, false);
1897 if (tevent_req_nomem(subreq, req)) {
1898 return;
1900 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1903 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1905 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1906 req, struct ctdb_g_lock_lock_state);
1907 int err;
1909 TALLOC_FREE(state->h);
1911 if (tevent_req_is_unix_error(req, &err)) {
1912 if (perr != NULL) {
1913 *perr = err;
1915 return false;
1918 return true;
1921 struct ctdb_g_lock_unlock_state {
1922 struct tevent_context *ev;
1923 struct ctdb_client_context *client;
1924 struct ctdb_db_context *db;
1925 TDB_DATA key;
1926 struct ctdb_server_id my_sid;
1927 struct ctdb_record_handle *h;
1928 struct ctdb_g_lock_list *lock_list;
1931 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1932 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1933 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1935 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1936 struct tevent_context *ev,
1937 struct ctdb_client_context *client,
1938 struct ctdb_db_context *db,
1939 const char *keyname,
1940 struct ctdb_server_id sid)
1942 struct tevent_req *req, *subreq;
1943 struct ctdb_g_lock_unlock_state *state;
1945 req = tevent_req_create(mem_ctx, &state,
1946 struct ctdb_g_lock_unlock_state);
1947 if (req == NULL) {
1948 return NULL;
1951 state->ev = ev;
1952 state->client = client;
1953 state->db = db;
1954 state->key.dptr = discard_const(keyname);
1955 state->key.dsize = strlen(keyname) + 1;
1956 state->my_sid = sid;
1958 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1959 false);
1960 if (tevent_req_nomem(subreq, req)) {
1961 return tevent_req_post(req, ev);
1963 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1965 return req;
1968 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1970 struct tevent_req *req = tevent_req_callback_data(
1971 subreq, struct tevent_req);
1972 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1973 req, struct ctdb_g_lock_unlock_state);
1974 TDB_DATA data;
1975 size_t np;
1976 int ret = 0;
1978 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1979 TALLOC_FREE(subreq);
1980 if (state->h == NULL) {
1981 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1982 (char *)state->key.dptr));
1983 tevent_req_error(req, ret);
1984 return;
1987 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1988 &state->lock_list, &np);
1989 if (ret != 0) {
1990 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1991 (char *)state->key.dptr));
1992 tevent_req_error(req, ret);
1993 return;
1996 ret = ctdb_g_lock_unlock_update(req);
1997 if (ret != 0) {
1998 tevent_req_error(req, ret);
1999 return;
2002 if (state->lock_list->num == 0) {
2003 subreq = ctdb_delete_record_send(state, state->ev, state->h);
2004 if (tevent_req_nomem(subreq, req)) {
2005 return;
2007 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
2008 req);
2009 return;
2012 TALLOC_FREE(state->h);
2013 tevent_req_done(req);
2016 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
2018 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2019 req, struct ctdb_g_lock_unlock_state);
2020 struct ctdb_g_lock *lock;
2021 int ret, i;
2023 for (i=0; i<state->lock_list->num; i++) {
2024 lock = &state->lock_list->lock[i];
2026 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
2027 break;
2031 if (i < state->lock_list->num) {
2032 state->lock_list->lock[i] =
2033 state->lock_list->lock[state->lock_list->num-1];
2034 state->lock_list->num -= 1;
2037 if (state->lock_list->num != 0) {
2038 TDB_DATA data;
2039 size_t np;
2041 data.dsize = ctdb_g_lock_list_len(state->lock_list);
2042 data.dptr = talloc_size(state, data.dsize);
2043 if (data.dptr == NULL) {
2044 return ENOMEM;
2047 ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
2048 ret = ctdb_store_record(state->h, data);
2049 talloc_free(data.dptr);
2050 if (ret != 0) {
2051 return ret;
2055 return 0;
2058 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2060 struct tevent_req *req = tevent_req_callback_data(
2061 subreq, struct tevent_req);
2062 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2063 req, struct ctdb_g_lock_unlock_state);
2064 int ret;
2065 bool status;
2067 status = ctdb_delete_record_recv(subreq, &ret);
2068 if (! status) {
2069 DEBUG(DEBUG_ERR,
2070 ("g_lock_unlock %s delete record failed, ret=%d\n",
2071 (char *)state->key.dptr, ret));
2072 tevent_req_error(req, ret);
2073 return;
2076 TALLOC_FREE(state->h);
2077 tevent_req_done(req);
2080 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2082 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2083 req, struct ctdb_g_lock_unlock_state);
2084 int err;
2086 TALLOC_FREE(state->h);
2088 if (tevent_req_is_unix_error(req, &err)) {
2089 if (perr != NULL) {
2090 *perr = err;
2092 return false;
2095 return true;
2099 * Persistent database functions
2101 struct ctdb_transaction_start_state {
2102 struct tevent_context *ev;
2103 struct ctdb_client_context *client;
2104 struct timeval timeout;
2105 struct ctdb_transaction_handle *h;
2106 uint32_t destnode;
2109 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2110 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2112 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2113 struct tevent_context *ev,
2114 struct ctdb_client_context *client,
2115 struct timeval timeout,
2116 struct ctdb_db_context *db,
2117 bool readonly)
2119 struct ctdb_transaction_start_state *state;
2120 struct tevent_req *req, *subreq;
2121 struct ctdb_transaction_handle *h;
2123 req = tevent_req_create(mem_ctx, &state,
2124 struct ctdb_transaction_start_state);
2125 if (req == NULL) {
2126 return NULL;
2129 if (ctdb_db_volatile(db)) {
2130 tevent_req_error(req, EINVAL);
2131 return tevent_req_post(req, ev);
2134 state->ev = ev;
2135 state->client = client;
2136 state->destnode = ctdb_client_pnn(client);
2138 h = talloc_zero(db, struct ctdb_transaction_handle);
2139 if (tevent_req_nomem(h, req)) {
2140 return tevent_req_post(req, ev);
2143 h->ev = ev;
2144 h->client = client;
2145 h->db = db;
2146 h->readonly = readonly;
2147 h->updated = false;
2149 /* SRVID is unique for databases, so client can have transactions
2150 * active for multiple databases */
2151 h->sid = ctdb_client_get_server_id(client, db->db_id);
2153 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2154 if (tevent_req_nomem(h->recbuf, req)) {
2155 return tevent_req_post(req, ev);
2158 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2159 if (tevent_req_nomem(h->lock_name, req)) {
2160 return tevent_req_post(req, ev);
2163 state->h = h;
2165 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2166 if (tevent_req_nomem(subreq, req)) {
2167 return tevent_req_post(req, ev);
2169 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2171 return req;
2174 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2176 struct tevent_req *req = tevent_req_callback_data(
2177 subreq, struct tevent_req);
2178 struct ctdb_transaction_start_state *state = tevent_req_data(
2179 req, struct ctdb_transaction_start_state);
2180 bool status;
2181 int ret;
2183 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2184 TALLOC_FREE(subreq);
2185 if (! status) {
2186 DEBUG(DEBUG_ERR,
2187 ("transaction_start: %s attach g_lock.tdb failed\n",
2188 state->h->db->db_name));
2189 tevent_req_error(req, ret);
2190 return;
2193 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2194 state->h->db_g_lock,
2195 state->h->lock_name,
2196 &state->h->sid, state->h->readonly);
2197 if (tevent_req_nomem(subreq, req)) {
2198 return;
2200 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2203 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2205 struct tevent_req *req = tevent_req_callback_data(
2206 subreq, struct tevent_req);
2207 struct ctdb_transaction_start_state *state = tevent_req_data(
2208 req, struct ctdb_transaction_start_state);
2209 int ret;
2210 bool status;
2212 status = ctdb_g_lock_lock_recv(subreq, &ret);
2213 TALLOC_FREE(subreq);
2214 if (! status) {
2215 DEBUG(DEBUG_ERR,
2216 ("transaction_start: %s g_lock lock failed, ret=%d\n",
2217 state->h->db->db_name, ret));
2218 tevent_req_error(req, ret);
2219 return;
2222 tevent_req_done(req);
2225 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2226 struct tevent_req *req,
2227 int *perr)
2229 struct ctdb_transaction_start_state *state = tevent_req_data(
2230 req, struct ctdb_transaction_start_state);
2231 int err;
2233 if (tevent_req_is_unix_error(req, &err)) {
2234 if (perr != NULL) {
2235 *perr = err;
2237 return NULL;
2240 return state->h;
2243 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2244 struct ctdb_client_context *client,
2245 struct timeval timeout,
2246 struct ctdb_db_context *db, bool readonly,
2247 struct ctdb_transaction_handle **out)
2249 struct tevent_req *req;
2250 struct ctdb_transaction_handle *h;
2251 int ret;
2253 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2254 readonly);
2255 if (req == NULL) {
2256 return ENOMEM;
2259 tevent_req_poll(req, ev);
2261 h = ctdb_transaction_start_recv(req, &ret);
2262 if (h == NULL) {
2263 return ret;
2266 *out = h;
2267 return 0;
2270 struct ctdb_transaction_record_fetch_state {
2271 TDB_DATA key, data;
2272 struct ctdb_ltdb_header header;
2273 bool found;
2276 static int ctdb_transaction_record_fetch_traverse(
2277 uint32_t reqid,
2278 struct ctdb_ltdb_header *nullheader,
2279 TDB_DATA key, TDB_DATA data,
2280 void *private_data)
2282 struct ctdb_transaction_record_fetch_state *state =
2283 (struct ctdb_transaction_record_fetch_state *)private_data;
2285 if (state->key.dsize == key.dsize &&
2286 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2287 int ret;
2289 ret = ctdb_ltdb_header_extract(&data, &state->header);
2290 if (ret != 0) {
2291 DEBUG(DEBUG_ERR,
2292 ("record_fetch: Failed to extract header, "
2293 "ret=%d\n", ret));
2294 return 1;
2297 state->data = data;
2298 state->found = true;
2301 return 0;
2304 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2305 TDB_DATA key,
2306 struct ctdb_ltdb_header *header,
2307 TDB_DATA *data)
2309 struct ctdb_transaction_record_fetch_state state;
2310 int ret;
2312 state.key = key;
2313 state.found = false;
2315 ret = ctdb_rec_buffer_traverse(h->recbuf,
2316 ctdb_transaction_record_fetch_traverse,
2317 &state);
2318 if (ret != 0) {
2319 return ret;
2322 if (state.found) {
2323 if (header != NULL) {
2324 *header = state.header;
2326 if (data != NULL) {
2327 *data = state.data;
2329 return 0;
2332 return ENOENT;
2335 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2336 TDB_DATA key,
2337 TALLOC_CTX *mem_ctx, TDB_DATA *data)
2339 TDB_DATA tmp_data;
2340 struct ctdb_ltdb_header header;
2341 int ret;
2343 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2344 if (ret == 0) {
2345 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2346 tmp_data.dsize);
2347 if (data->dptr == NULL) {
2348 return ENOMEM;
2350 data->dsize = tmp_data.dsize;
2351 return 0;
2354 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2355 if (ret != 0) {
2356 return ret;
2359 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2360 if (ret != 0) {
2361 return ret;
2364 return 0;
2367 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2368 TDB_DATA key, TDB_DATA data)
2370 TALLOC_CTX *tmp_ctx;
2371 struct ctdb_ltdb_header header;
2372 TDB_DATA old_data;
2373 int ret;
2375 if (h->readonly) {
2376 return EINVAL;
2379 tmp_ctx = talloc_new(h);
2380 if (tmp_ctx == NULL) {
2381 return ENOMEM;
2384 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2385 if (ret != 0) {
2386 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2387 if (ret != 0) {
2388 return ret;
2392 if (old_data.dsize == data.dsize &&
2393 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2394 talloc_free(tmp_ctx);
2395 return 0;
2398 header.dmaster = ctdb_client_pnn(h->client);
2399 header.rsn += 1;
2401 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2402 talloc_free(tmp_ctx);
2403 if (ret != 0) {
2404 return ret;
2406 h->updated = true;
2408 return 0;
2411 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2412 TDB_DATA key)
2414 return ctdb_transaction_store_record(h, key, tdb_null);
2417 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2418 uint64_t *seqnum)
2420 const char *keyname = CTDB_DB_SEQNUM_KEY;
2421 TDB_DATA key, data;
2422 struct ctdb_ltdb_header header;
2423 int ret;
2425 key.dptr = discard_const(keyname);
2426 key.dsize = strlen(keyname) + 1;
2428 ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2429 if (ret != 0) {
2430 DEBUG(DEBUG_ERR,
2431 ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2432 h->db->db_name, ret));
2433 return ret;
2436 if (data.dsize == 0) {
2437 /* initial data */
2438 *seqnum = 0;
2439 return 0;
2442 if (data.dsize != sizeof(uint64_t)) {
2443 talloc_free(data.dptr);
2444 return EINVAL;
2447 *seqnum = *(uint64_t *)data.dptr;
2449 talloc_free(data.dptr);
2450 return 0;
2453 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2454 uint64_t seqnum)
2456 const char *keyname = CTDB_DB_SEQNUM_KEY;
2457 TDB_DATA key, data;
2459 key.dptr = discard_const(keyname);
2460 key.dsize = strlen(keyname) + 1;
2462 data.dptr = (uint8_t *)&seqnum;
2463 data.dsize = sizeof(seqnum);
2465 return ctdb_transaction_store_record(h, key, data);
2468 struct ctdb_transaction_commit_state {
2469 struct tevent_context *ev;
2470 struct timeval timeout;
2471 struct ctdb_transaction_handle *h;
2472 uint64_t seqnum;
2475 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2476 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2478 struct tevent_req *ctdb_transaction_commit_send(
2479 TALLOC_CTX *mem_ctx,
2480 struct tevent_context *ev,
2481 struct timeval timeout,
2482 struct ctdb_transaction_handle *h)
2484 struct tevent_req *req, *subreq;
2485 struct ctdb_transaction_commit_state *state;
2486 struct ctdb_req_control request;
2487 int ret;
2489 req = tevent_req_create(mem_ctx, &state,
2490 struct ctdb_transaction_commit_state);
2491 if (req == NULL) {
2492 return NULL;
2495 state->ev = ev;
2496 state->timeout = timeout;
2497 state->h = h;
2499 ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2500 if (ret != 0) {
2501 tevent_req_error(req, ret);
2502 return tevent_req_post(req, ev);
2505 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2506 if (ret != 0) {
2507 tevent_req_error(req, ret);
2508 return tevent_req_post(req, ev);
2511 ctdb_req_control_trans3_commit(&request, h->recbuf);
2512 subreq = ctdb_client_control_send(state, ev, h->client,
2513 ctdb_client_pnn(h->client),
2514 timeout, &request);
2515 if (tevent_req_nomem(subreq, req)) {
2516 return tevent_req_post(req, ev);
2518 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2520 return req;
2523 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2525 struct tevent_req *req = tevent_req_callback_data(
2526 subreq, struct tevent_req);
2527 struct ctdb_transaction_commit_state *state = tevent_req_data(
2528 req, struct ctdb_transaction_commit_state);
2529 struct ctdb_transaction_handle *h = state->h;
2530 struct ctdb_reply_control *reply;
2531 uint64_t seqnum;
2532 int ret;
2533 bool status;
2535 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2536 TALLOC_FREE(subreq);
2537 if (! status) {
2538 DEBUG(DEBUG_ERR,
2539 ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2540 h->db->db_name, ret));
2541 tevent_req_error(req, ret);
2542 return;
2545 ret = ctdb_reply_control_trans3_commit(reply);
2546 talloc_free(reply);
2548 if (ret != 0) {
2549 /* Control failed due to recovery */
2551 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2552 if (ret != 0) {
2553 tevent_req_error(req, ret);
2554 return;
2557 if (seqnum == state->seqnum) {
2558 struct ctdb_req_control request;
2560 /* try again */
2561 ctdb_req_control_trans3_commit(&request,
2562 state->h->recbuf);
2563 subreq = ctdb_client_control_send(
2564 state, state->ev, state->h->client,
2565 ctdb_client_pnn(state->h->client),
2566 state->timeout, &request);
2567 if (tevent_req_nomem(subreq, req)) {
2568 return;
2570 tevent_req_set_callback(subreq,
2571 ctdb_transaction_commit_done,
2572 req);
2573 return;
2576 if (seqnum != state->seqnum + 1) {
2577 DEBUG(DEBUG_ERR,
2578 ("transaction_commit: %s seqnum mismatch "
2579 "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2580 state->h->db->db_name, seqnum, state->seqnum));
2581 tevent_req_error(req, EIO);
2582 return;
2586 /* trans3_commit successful */
2587 subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2588 h->db_g_lock, h->lock_name, h->sid);
2589 if (tevent_req_nomem(subreq, req)) {
2590 return;
2592 tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2593 req);
2596 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2598 struct tevent_req *req = tevent_req_callback_data(
2599 subreq, struct tevent_req);
2600 struct ctdb_transaction_commit_state *state = tevent_req_data(
2601 req, struct ctdb_transaction_commit_state);
2602 int ret;
2603 bool status;
2605 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2606 TALLOC_FREE(subreq);
2607 if (! status) {
2608 DEBUG(DEBUG_ERR,
2609 ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2610 state->h->db->db_name, ret));
2611 tevent_req_error(req, ret);
2612 return;
2615 talloc_free(state->h);
2616 tevent_req_done(req);
2619 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2621 int err;
2623 if (tevent_req_is_unix_error(req, &err)) {
2624 if (perr != NULL) {
2625 *perr = err;
2627 return false;
2630 return true;
2633 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2635 struct tevent_context *ev = h->ev;
2636 TALLOC_CTX *mem_ctx;
2637 struct tevent_req *req;
2638 int ret;
2639 bool status;
2641 if (h->readonly || ! h->updated) {
2642 return ctdb_transaction_cancel(h);
2645 mem_ctx = talloc_new(NULL);
2646 if (mem_ctx == NULL) {
2647 return ENOMEM;
2650 req = ctdb_transaction_commit_send(mem_ctx, ev,
2651 tevent_timeval_zero(), h);
2652 if (req == NULL) {
2653 talloc_free(mem_ctx);
2654 return ENOMEM;
2657 tevent_req_poll(req, ev);
2659 status = ctdb_transaction_commit_recv(req, &ret);
2660 if (! status) {
2661 talloc_free(mem_ctx);
2662 return ret;
2665 talloc_free(mem_ctx);
2666 return 0;
2669 struct ctdb_transaction_cancel_state {
2670 struct tevent_context *ev;
2671 struct ctdb_transaction_handle *h;
2672 struct timeval timeout;
2675 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2677 struct tevent_req *ctdb_transaction_cancel_send(
2678 TALLOC_CTX *mem_ctx,
2679 struct tevent_context *ev,
2680 struct timeval timeout,
2681 struct ctdb_transaction_handle *h)
2683 struct tevent_req *req, *subreq;
2684 struct ctdb_transaction_cancel_state *state;
2686 req = tevent_req_create(mem_ctx, &state,
2687 struct ctdb_transaction_cancel_state);
2688 if (req == NULL) {
2689 return NULL;
2692 state->ev = ev;
2693 state->h = h;
2694 state->timeout = timeout;
2696 subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2697 state->h->db_g_lock,
2698 state->h->lock_name, state->h->sid);
2699 if (tevent_req_nomem(subreq, req)) {
2700 return tevent_req_post(req, ev);
2702 tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2703 req);
2705 return req;
2708 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2710 struct tevent_req *req = tevent_req_callback_data(
2711 subreq, struct tevent_req);
2712 struct ctdb_transaction_cancel_state *state = tevent_req_data(
2713 req, struct ctdb_transaction_cancel_state);
2714 int ret;
2715 bool status;
2717 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2718 TALLOC_FREE(subreq);
2719 if (! status) {
2720 DEBUG(DEBUG_ERR,
2721 ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2722 state->h->db->db_name, ret));
2723 talloc_free(state->h);
2724 tevent_req_error(req, ret);
2725 return;
2728 talloc_free(state->h);
2729 tevent_req_done(req);
2732 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2734 int err;
2736 if (tevent_req_is_unix_error(req, &err)) {
2737 if (perr != NULL) {
2738 *perr = err;
2740 return false;
2743 return true;
2746 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2748 struct tevent_context *ev = h->ev;
2749 struct tevent_req *req;
2750 TALLOC_CTX *mem_ctx;
2751 int ret;
2752 bool status;
2754 mem_ctx = talloc_new(NULL);
2755 if (mem_ctx == NULL) {
2756 talloc_free(h);
2757 return ENOMEM;
2760 req = ctdb_transaction_cancel_send(mem_ctx, ev,
2761 tevent_timeval_zero(), h);
2762 if (req == NULL) {
2763 talloc_free(mem_ctx);
2764 talloc_free(h);
2765 return ENOMEM;
2768 tevent_req_poll(req, ev);
2770 status = ctdb_transaction_cancel_recv(req, &ret);
2771 if (! status) {
2772 talloc_free(mem_ctx);
2773 return ret;
2776 talloc_free(mem_ctx);
2777 return 0;
2781 * TODO:
2783 * In future Samba should register SERVER_ID.
2784 * Make that structure same as struct srvid {}.