s3:dbwrap_ctdb: use db_ctdb_ltdb_fetch() inside db_ctdb_transaction_fetch()
[Samba/cd1.git] / source3 / lib / dbwrap_ctdb.c
blobe4399c60381e7f26552bcdb75f00cb73960d927c
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
40 struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
47 struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
74 return status;
78 /**
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83 TDB_DATA key,
84 struct ctdb_ltdb_header *header,
85 TALLOC_CTX *mem_ctx,
86 TDB_DATA *data)
88 TDB_DATA rec;
89 NTSTATUS status;
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
94 if (data) {
95 ZERO_STRUCTP(data);
97 if (header) {
98 header->dmaster = (uint32_t)-1;
99 header->rsn = 0;
101 goto done;
104 if (header) {
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
108 if (data) {
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
111 data->dptr = NULL;
112 } else {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114 rec.dptr
115 + sizeof(struct ctdb_ltdb_header),
116 data->dsize);
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
119 goto done;
124 status = NT_STATUS_OK;
126 done:
127 SAFE_FREE(rec.dptr);
128 return status;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136 TDB_DATA key,
137 struct ctdb_ltdb_header *header,
138 TDB_DATA data)
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
141 TDB_DATA rec;
142 int ret;
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
157 talloc_free(tmp_ctx);
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
168 of the record
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
171 TDB_DATA key,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA data)
175 size_t length;
176 struct ctdb_rec_data *d;
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181 if (d == NULL) {
182 return NULL;
184 d->length = length;
185 d->reqid = reqid;
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
188 if (header) {
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192 } else {
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
196 return d;
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
203 uint64_t db_id,
204 uint32_t reqid,
205 TDB_DATA key,
206 struct ctdb_ltdb_header *header,
207 TDB_DATA data)
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214 if (r == NULL) {
215 talloc_free(m);
216 return NULL;
219 if (m == NULL) {
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222 if (m == NULL) {
223 goto done;
225 m->db_id = db_id;
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
233 if (m2 == NULL) {
234 talloc_free(m);
235 goto done;
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
240 m2->count++;
242 done:
243 talloc_free(r);
244 return m2;
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
250 TDB_DATA data;
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
253 return data;
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263 uint32_t *reqid,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
267 if (r == NULL) {
268 r = (struct ctdb_rec_data *)&m->data[0];
269 } else {
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
273 if (reqid != NULL) {
274 *reqid = r->reqid;
277 if (key != NULL) {
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
281 if (data != NULL) {
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
292 return NULL;
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
297 return r;
303 * CTDB transaction destructor
305 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
307 tdb_transaction_cancel(h->ctx->wtdb->tdb);
308 return 0;
312 * start a transaction on a ctdb database:
313 * - lock the transaction lock key
314 * - start the tdb transaction
316 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
318 struct db_record *rh;
319 TDB_DATA key;
320 TALLOC_CTX *tmp_ctx;
321 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
322 int ret;
323 struct db_ctdb_ctx *ctx = h->ctx;
324 TDB_DATA data;
326 key.dptr = (uint8_t *)discard_const(keyname);
327 key.dsize = strlen(keyname);
329 again:
330 tmp_ctx = talloc_new(h);
332 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
333 if (rh == NULL) {
334 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
335 talloc_free(tmp_ctx);
336 return -1;
338 talloc_free(rh);
340 ret = tdb_transaction_start(ctx->wtdb->tdb);
341 if (ret != 0) {
342 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
343 talloc_free(tmp_ctx);
344 return -1;
347 data = tdb_fetch(ctx->wtdb->tdb, key);
348 if ((data.dptr == NULL) ||
349 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
350 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
351 SAFE_FREE(data.dptr);
352 tdb_transaction_cancel(ctx->wtdb->tdb);
353 talloc_free(tmp_ctx);
354 goto again;
357 SAFE_FREE(data.dptr);
358 talloc_free(tmp_ctx);
360 return 0;
365 * CTDB dbwrap API: transaction_start function
366 * starts a transaction on a persistent database
368 static int db_ctdb_transaction_start(struct db_context *db)
370 struct db_ctdb_transaction_handle *h;
371 int ret;
372 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
373 struct db_ctdb_ctx);
375 if (!db->persistent) {
376 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
377 ctx->db_id));
378 return -1;
381 if (ctx->transaction) {
382 ctx->transaction->nesting++;
383 return 0;
386 h = talloc_zero(db, struct db_ctdb_transaction_handle);
387 if (h == NULL) {
388 DEBUG(0,(__location__ " oom for transaction handle\n"));
389 return -1;
392 h->ctx = ctx;
394 ret = db_ctdb_transaction_fetch_start(h);
395 if (ret != 0) {
396 talloc_free(h);
397 return -1;
400 talloc_set_destructor(h, db_ctdb_transaction_destructor);
402 ctx->transaction = h;
404 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
406 return 0;
412 fetch a record inside a transaction
414 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
415 TALLOC_CTX *mem_ctx,
416 TDB_DATA key, TDB_DATA *data)
418 struct db_ctdb_transaction_handle *h = db->transaction;
419 NTSTATUS status;
421 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
423 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
424 *data = tdb_null;
425 } else if (!NT_STATUS_IS_OK(status)) {
426 return -1;
429 if (!h->in_replay) {
430 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
431 if (h->m_all == NULL) {
432 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
433 data->dsize = 0;
434 talloc_free(data->dptr);
435 return -1;
439 return 0;
443 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
444 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
446 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
447 TALLOC_CTX *mem_ctx,
448 TDB_DATA key)
450 struct db_record *result;
451 TDB_DATA ctdb_data;
453 if (!(result = talloc(mem_ctx, struct db_record))) {
454 DEBUG(0, ("talloc failed\n"));
455 return NULL;
458 result->private_data = ctx->transaction;
460 result->key.dsize = key.dsize;
461 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
462 if (result->key.dptr == NULL) {
463 DEBUG(0, ("talloc failed\n"));
464 TALLOC_FREE(result);
465 return NULL;
468 result->store = db_ctdb_store_transaction;
469 result->delete_rec = db_ctdb_delete_transaction;
471 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
472 if (ctdb_data.dptr == NULL) {
473 /* create the record */
474 result->value = tdb_null;
475 return result;
478 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
479 result->value.dptr = NULL;
481 if ((result->value.dsize != 0)
482 && !(result->value.dptr = (uint8 *)talloc_memdup(
483 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
484 result->value.dsize))) {
485 DEBUG(0, ("talloc failed\n"));
486 TALLOC_FREE(result);
489 SAFE_FREE(ctdb_data.dptr);
491 return result;
494 static int db_ctdb_record_destructor(struct db_record **recp)
496 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
497 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
498 rec->private_data, struct db_ctdb_transaction_handle);
499 int ret = h->ctx->db->transaction_commit(h->ctx->db);
500 if (ret != 0) {
501 DEBUG(0,(__location__ " transaction_commit failed\n"));
503 return 0;
507 auto-create a transaction for persistent databases
509 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
510 TALLOC_CTX *mem_ctx,
511 TDB_DATA key)
513 int res;
514 struct db_record *rec, **recp;
516 res = db_ctdb_transaction_start(ctx->db);
517 if (res == -1) {
518 return NULL;
521 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
522 if (rec == NULL) {
523 ctx->db->transaction_cancel(ctx->db);
524 return NULL;
527 /* destroy this transaction when we release the lock */
528 recp = talloc(rec, struct db_record *);
529 if (recp == NULL) {
530 ctx->db->transaction_cancel(ctx->db);
531 talloc_free(rec);
532 return NULL;
534 *recp = rec;
535 talloc_set_destructor(recp, db_ctdb_record_destructor);
536 return rec;
541 stores a record inside a transaction
543 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
544 TDB_DATA key, TDB_DATA data)
546 TALLOC_CTX *tmp_ctx = talloc_new(h);
547 int ret;
548 TDB_DATA rec;
549 struct ctdb_ltdb_header header;
550 NTSTATUS status;
552 /* we need the header so we can update the RSN */
553 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
554 if (rec.dptr == NULL) {
555 /* the record doesn't exist - create one with us as dmaster.
556 This is only safe because we are in a transaction and this
557 is a persistent database */
558 ZERO_STRUCT(header);
559 } else {
560 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
561 rec.dsize -= sizeof(struct ctdb_ltdb_header);
562 /* a special case, we are writing the same data that is there now */
563 if (data.dsize == rec.dsize &&
564 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
565 SAFE_FREE(rec.dptr);
566 talloc_free(tmp_ctx);
567 return 0;
569 SAFE_FREE(rec.dptr);
572 header.dmaster = get_my_vnn();
573 header.rsn++;
575 if (!h->in_replay) {
576 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
577 if (h->m_all == NULL) {
578 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
579 talloc_free(tmp_ctx);
580 return -1;
584 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
585 if (h->m_write == NULL) {
586 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
587 talloc_free(tmp_ctx);
588 return -1;
591 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
592 if (NT_STATUS_IS_OK(status)) {
593 ret = 0;
594 } else {
595 ret = -1;
598 talloc_free(tmp_ctx);
600 return ret;
605 a record store inside a transaction
607 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
609 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
610 rec->private_data, struct db_ctdb_transaction_handle);
611 int ret;
613 ret = db_ctdb_transaction_store(h, rec->key, data);
614 if (ret != 0) {
615 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
617 return NT_STATUS_OK;
621 a record delete inside a transaction
623 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
625 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
626 rec->private_data, struct db_ctdb_transaction_handle);
627 int ret;
629 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
630 if (ret != 0) {
631 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
633 return NT_STATUS_OK;
638 replay a transaction
640 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
642 int ret, i;
643 struct ctdb_rec_data *rec = NULL;
645 h->in_replay = true;
646 talloc_free(h->m_write);
647 h->m_write = NULL;
649 ret = db_ctdb_transaction_fetch_start(h);
650 if (ret != 0) {
651 return ret;
654 for (i=0;i<h->m_all->count;i++) {
655 TDB_DATA key, data;
657 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
658 if (rec == NULL) {
659 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
660 goto failed;
663 if (rec->reqid == 0) {
664 /* its a store */
665 if (db_ctdb_transaction_store(h, key, data) != 0) {
666 goto failed;
668 } else {
669 TDB_DATA data2;
670 TALLOC_CTX *tmp_ctx = talloc_new(h);
672 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
673 talloc_free(tmp_ctx);
674 goto failed;
676 if (data2.dsize != data.dsize ||
677 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
678 /* the record has changed on us - we have to give up */
679 talloc_free(tmp_ctx);
680 goto failed;
682 talloc_free(tmp_ctx);
686 return 0;
688 failed:
689 tdb_transaction_cancel(h->ctx->wtdb->tdb);
690 return -1;
695 commit a transaction
697 static int db_ctdb_transaction_commit(struct db_context *db)
699 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
700 struct db_ctdb_ctx);
701 NTSTATUS rets;
702 int ret;
703 int status;
704 int retries = 0;
705 struct db_ctdb_transaction_handle *h = ctx->transaction;
706 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
708 if (h == NULL) {
709 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
710 return -1;
713 if (h->nested_cancel) {
714 db->transaction_cancel(db);
715 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
716 return -1;
719 if (h->nesting != 0) {
720 h->nesting--;
721 return 0;
724 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
726 talloc_set_destructor(h, NULL);
728 /* our commit strategy is quite complex.
730 - we first try to commit the changes to all other nodes
732 - if that works, then we commit locally and we are done
734 - if a commit on another node fails, then we need to cancel
735 the transaction, then restart the transaction (thus
736 opening a window of time for a pending recovery to
737 complete), then replay the transaction, checking all the
738 reads and writes (checking that reads give the same data,
739 and writes succeed). Then we retry the transaction to the
740 other nodes
743 again:
744 if (h->m_write == NULL) {
745 /* no changes were made, potentially after a retry */
746 tdb_transaction_cancel(h->ctx->wtdb->tdb);
747 talloc_free(h);
748 ctx->transaction = NULL;
749 return 0;
752 /* tell ctdbd to commit to the other nodes */
753 rets = ctdbd_control_local(messaging_ctdbd_connection(),
754 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
755 h->ctx->db_id, 0,
756 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
757 if (!NT_STATUS_IS_OK(rets) || status != 0) {
758 tdb_transaction_cancel(h->ctx->wtdb->tdb);
759 sleep(1);
761 if (!NT_STATUS_IS_OK(rets)) {
762 failure_control = CTDB_CONTROL_TRANS2_ERROR;
763 } else {
764 /* work out what error code we will give if we
765 have to fail the operation */
766 switch ((enum ctdb_trans2_commit_error)status) {
767 case CTDB_TRANS2_COMMIT_SUCCESS:
768 case CTDB_TRANS2_COMMIT_SOMEFAIL:
769 case CTDB_TRANS2_COMMIT_TIMEOUT:
770 failure_control = CTDB_CONTROL_TRANS2_ERROR;
771 break;
772 case CTDB_TRANS2_COMMIT_ALLFAIL:
773 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
774 break;
778 if (++retries == 5) {
779 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
780 h->ctx->db_id, retries, (unsigned)failure_control));
781 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
782 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
783 tdb_null, NULL, NULL, NULL);
784 h->ctx->transaction = NULL;
785 talloc_free(h);
786 ctx->transaction = NULL;
787 return -1;
790 if (ctdb_replay_transaction(h) != 0) {
791 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
792 (unsigned)failure_control));
793 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
794 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
795 tdb_null, NULL, NULL, NULL);
796 h->ctx->transaction = NULL;
797 talloc_free(h);
798 ctx->transaction = NULL;
799 return -1;
801 goto again;
802 } else {
803 failure_control = CTDB_CONTROL_TRANS2_ERROR;
806 /* do the real commit locally */
807 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
808 if (ret != 0) {
809 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
810 (unsigned)failure_control));
811 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
812 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
813 h->ctx->transaction = NULL;
814 talloc_free(h);
815 return ret;
818 /* tell ctdbd that we are finished with our local commit */
819 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
820 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
821 tdb_null, NULL, NULL, NULL);
822 h->ctx->transaction = NULL;
823 talloc_free(h);
824 return 0;
829 cancel a transaction
831 static int db_ctdb_transaction_cancel(struct db_context *db)
833 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
834 struct db_ctdb_ctx);
835 struct db_ctdb_transaction_handle *h = ctx->transaction;
837 if (h == NULL) {
838 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
839 return -1;
842 if (h->nesting != 0) {
843 h->nesting--;
844 h->nested_cancel = true;
845 return 0;
848 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
850 ctx->transaction = NULL;
851 talloc_free(h);
852 return 0;
856 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
858 struct db_ctdb_rec *crec = talloc_get_type_abort(
859 rec->private_data, struct db_ctdb_rec);
861 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
866 static NTSTATUS db_ctdb_delete(struct db_record *rec)
868 TDB_DATA data;
871 * We have to store the header with empty data. TODO: Fix the
872 * tdb-level cleanup
875 ZERO_STRUCT(data);
877 return db_ctdb_store(rec, data, 0);
881 static int db_ctdb_record_destr(struct db_record* data)
883 struct db_ctdb_rec *crec = talloc_get_type_abort(
884 data->private_data, struct db_ctdb_rec);
886 DEBUG(10, (DEBUGLEVEL > 10
887 ? "Unlocking db %u key %s\n"
888 : "Unlocking db %u key %.20s\n",
889 (int)crec->ctdb_ctx->db_id,
890 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
891 data->key.dsize)));
893 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
894 DEBUG(0, ("tdb_chainunlock failed\n"));
895 return -1;
898 return 0;
901 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
902 TALLOC_CTX *mem_ctx,
903 TDB_DATA key,
904 bool persistent)
906 struct db_record *result;
907 struct db_ctdb_rec *crec;
908 NTSTATUS status;
909 TDB_DATA ctdb_data;
910 int migrate_attempts = 0;
912 if (!(result = talloc(mem_ctx, struct db_record))) {
913 DEBUG(0, ("talloc failed\n"));
914 return NULL;
917 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
918 DEBUG(0, ("talloc failed\n"));
919 TALLOC_FREE(result);
920 return NULL;
923 result->private_data = (void *)crec;
924 crec->ctdb_ctx = ctx;
926 result->key.dsize = key.dsize;
927 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
928 if (result->key.dptr == NULL) {
929 DEBUG(0, ("talloc failed\n"));
930 TALLOC_FREE(result);
931 return NULL;
935 * Do a blocking lock on the record
937 again:
939 if (DEBUGLEVEL >= 10) {
940 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
941 DEBUG(10, (DEBUGLEVEL > 10
942 ? "Locking db %u key %s\n"
943 : "Locking db %u key %.20s\n",
944 (int)crec->ctdb_ctx->db_id, keystr));
945 TALLOC_FREE(keystr);
948 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
949 DEBUG(3, ("tdb_chainlock failed\n"));
950 TALLOC_FREE(result);
951 return NULL;
954 result->store = db_ctdb_store;
955 result->delete_rec = db_ctdb_delete;
956 talloc_set_destructor(result, db_ctdb_record_destr);
958 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
961 * See if we have a valid record and we are the dmaster. If so, we can
962 * take the shortcut and just return it.
965 if ((ctdb_data.dptr == NULL) ||
966 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
967 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
968 #if 0
969 || (random() % 2 != 0)
970 #endif
972 SAFE_FREE(ctdb_data.dptr);
973 tdb_chainunlock(ctx->wtdb->tdb, key);
974 talloc_set_destructor(result, NULL);
976 migrate_attempts += 1;
978 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
979 ctdb_data.dptr, ctdb_data.dptr ?
980 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
981 get_my_vnn()));
983 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
984 if (!NT_STATUS_IS_OK(status)) {
985 DEBUG(5, ("ctdb_migrate failed: %s\n",
986 nt_errstr(status)));
987 TALLOC_FREE(result);
988 return NULL;
990 /* now its migrated, try again */
991 goto again;
994 if (migrate_attempts > 10) {
995 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
996 migrate_attempts));
999 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1001 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1002 result->value.dptr = NULL;
1004 if ((result->value.dsize != 0)
1005 && !(result->value.dptr = (uint8 *)talloc_memdup(
1006 result, ctdb_data.dptr + sizeof(crec->header),
1007 result->value.dsize))) {
1008 DEBUG(0, ("talloc failed\n"));
1009 TALLOC_FREE(result);
1012 SAFE_FREE(ctdb_data.dptr);
1014 return result;
1017 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1018 TALLOC_CTX *mem_ctx,
1019 TDB_DATA key)
1021 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1022 struct db_ctdb_ctx);
1024 if (ctx->transaction != NULL) {
1025 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1028 if (db->persistent) {
1029 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1032 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1036 fetch (unlocked, no migration) operation on ctdb
1038 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1039 TDB_DATA key, TDB_DATA *data)
1041 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1042 struct db_ctdb_ctx);
1043 NTSTATUS status;
1044 TDB_DATA ctdb_data;
1046 if (ctx->transaction) {
1047 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1050 /* try a direct fetch */
1051 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1054 * See if we have a valid record and we are the dmaster. If so, we can
1055 * take the shortcut and just return it.
1056 * we bypass the dmaster check for persistent databases
1058 if ((ctdb_data.dptr != NULL) &&
1059 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1060 (db->persistent ||
1061 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1062 /* we are the dmaster - avoid the ctdb protocol op */
1064 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1065 if (data->dsize == 0) {
1066 SAFE_FREE(ctdb_data.dptr);
1067 data->dptr = NULL;
1068 return 0;
1071 data->dptr = (uint8 *)talloc_memdup(
1072 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1073 data->dsize);
1075 SAFE_FREE(ctdb_data.dptr);
1077 if (data->dptr == NULL) {
1078 return -1;
1080 return 0;
1083 SAFE_FREE(ctdb_data.dptr);
1085 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1086 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1087 if (!NT_STATUS_IS_OK(status)) {
1088 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1089 return -1;
1092 return 0;
1095 struct traverse_state {
1096 struct db_context *db;
1097 int (*fn)(struct db_record *rec, void *private_data);
1098 void *private_data;
1101 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1103 struct traverse_state *state = (struct traverse_state *)private_data;
1104 struct db_record *rec;
1105 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1106 /* we have to give them a locked record to prevent races */
1107 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1108 if (rec && rec->value.dsize > 0) {
1109 state->fn(rec, state->private_data);
1111 talloc_free(tmp_ctx);
1114 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1115 void *private_data)
1117 struct traverse_state *state = (struct traverse_state *)private_data;
1118 struct db_record *rec;
1119 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1120 int ret = 0;
1121 /* we have to give them a locked record to prevent races */
1122 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1123 if (rec && rec->value.dsize > 0) {
1124 ret = state->fn(rec, state->private_data);
1126 talloc_free(tmp_ctx);
1127 return ret;
1130 static int db_ctdb_traverse(struct db_context *db,
1131 int (*fn)(struct db_record *rec,
1132 void *private_data),
1133 void *private_data)
1135 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1136 struct db_ctdb_ctx);
1137 struct traverse_state state;
1139 state.db = db;
1140 state.fn = fn;
1141 state.private_data = private_data;
1143 if (db->persistent) {
1144 /* for persistent databases we don't need to do a ctdb traverse,
1145 we can do a faster local traverse */
1146 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1150 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1151 return 0;
1154 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1156 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1159 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1161 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1164 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1166 struct traverse_state *state = (struct traverse_state *)private_data;
1167 struct db_record rec;
1168 rec.key = key;
1169 rec.value = data;
1170 rec.store = db_ctdb_store_deny;
1171 rec.delete_rec = db_ctdb_delete_deny;
1172 rec.private_data = state->db;
1173 state->fn(&rec, state->private_data);
1176 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1177 void *private_data)
1179 struct traverse_state *state = (struct traverse_state *)private_data;
1180 struct db_record rec;
1181 rec.key = kbuf;
1182 rec.value = dbuf;
1183 rec.store = db_ctdb_store_deny;
1184 rec.delete_rec = db_ctdb_delete_deny;
1185 rec.private_data = state->db;
1187 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1188 /* a deleted record */
1189 return 0;
1191 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1192 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1194 return state->fn(&rec, state->private_data);
1197 static int db_ctdb_traverse_read(struct db_context *db,
1198 int (*fn)(struct db_record *rec,
1199 void *private_data),
1200 void *private_data)
1202 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1203 struct db_ctdb_ctx);
1204 struct traverse_state state;
1206 state.db = db;
1207 state.fn = fn;
1208 state.private_data = private_data;
1210 if (db->persistent) {
1211 /* for persistent databases we don't need to do a ctdb traverse,
1212 we can do a faster local traverse */
1213 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1216 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1217 return 0;
1220 static int db_ctdb_get_seqnum(struct db_context *db)
1222 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1223 struct db_ctdb_ctx);
1224 return tdb_get_seqnum(ctx->wtdb->tdb);
1227 static int db_ctdb_get_flags(struct db_context *db)
1229 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1230 struct db_ctdb_ctx);
1231 return tdb_get_flags(ctx->wtdb->tdb);
1234 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1235 const char *name,
1236 int hash_size, int tdb_flags,
1237 int open_flags, mode_t mode)
1239 struct db_context *result;
1240 struct db_ctdb_ctx *db_ctdb;
1241 char *db_path;
1243 if (!lp_clustering()) {
1244 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1245 return NULL;
1248 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1249 DEBUG(0, ("talloc failed\n"));
1250 TALLOC_FREE(result);
1251 return NULL;
1254 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1255 DEBUG(0, ("talloc failed\n"));
1256 TALLOC_FREE(result);
1257 return NULL;
1260 db_ctdb->transaction = NULL;
1261 db_ctdb->db = result;
1263 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1264 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1265 TALLOC_FREE(result);
1266 return NULL;
1269 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1271 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1273 /* only pass through specific flags */
1274 tdb_flags &= TDB_SEQNUM;
1276 /* honor permissions if user has specified O_CREAT */
1277 if (open_flags & O_CREAT) {
1278 chmod(db_path, mode);
1281 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1282 if (db_ctdb->wtdb == NULL) {
1283 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1284 TALLOC_FREE(result);
1285 return NULL;
1287 talloc_free(db_path);
1289 result->private_data = (void *)db_ctdb;
1290 result->fetch_locked = db_ctdb_fetch_locked;
1291 result->fetch = db_ctdb_fetch;
1292 result->traverse = db_ctdb_traverse;
1293 result->traverse_read = db_ctdb_traverse_read;
1294 result->get_seqnum = db_ctdb_get_seqnum;
1295 result->get_flags = db_ctdb_get_flags;
1296 result->transaction_start = db_ctdb_transaction_start;
1297 result->transaction_commit = db_ctdb_transaction_commit;
1298 result->transaction_cancel = db_ctdb_transaction_cancel;
1300 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1301 name, db_ctdb->db_id));
1303 return result;
1305 #endif