s3:dbwrap_ctdb: fix a race in starting concurrent transactions on a single node
[Samba/ekacnet.git] / source3 / lib / dbwrap_ctdb.c
blob49df62afd305d06d94154f0196a64ed8a77afa75
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
40 struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
47 struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
74 return status;
78 /**
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83 TDB_DATA key,
84 struct ctdb_ltdb_header *header,
85 TALLOC_CTX *mem_ctx,
86 TDB_DATA *data)
88 TDB_DATA rec;
89 NTSTATUS status;
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
94 if (data) {
95 ZERO_STRUCTP(data);
97 if (header) {
98 header->dmaster = (uint32_t)-1;
99 header->rsn = 0;
101 goto done;
104 if (header) {
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
108 if (data) {
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
111 data->dptr = NULL;
112 } else {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114 rec.dptr
115 + sizeof(struct ctdb_ltdb_header),
116 data->dsize);
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
119 goto done;
124 status = NT_STATUS_OK;
126 done:
127 SAFE_FREE(rec.dptr);
128 return status;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136 TDB_DATA key,
137 struct ctdb_ltdb_header *header,
138 TDB_DATA data)
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
141 TDB_DATA rec;
142 int ret;
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
157 talloc_free(tmp_ctx);
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
168 of the record
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
171 TDB_DATA key,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA data)
175 size_t length;
176 struct ctdb_rec_data *d;
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181 if (d == NULL) {
182 return NULL;
184 d->length = length;
185 d->reqid = reqid;
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
188 if (header) {
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192 } else {
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
196 return d;
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
203 uint64_t db_id,
204 uint32_t reqid,
205 TDB_DATA key,
206 struct ctdb_ltdb_header *header,
207 TDB_DATA data)
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214 if (r == NULL) {
215 talloc_free(m);
216 return NULL;
219 if (m == NULL) {
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222 if (m == NULL) {
223 goto done;
225 m->db_id = db_id;
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
233 if (m2 == NULL) {
234 talloc_free(m);
235 goto done;
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
240 m2->count++;
242 done:
243 talloc_free(r);
244 return m2;
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
250 TDB_DATA data;
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
253 return data;
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263 uint32_t *reqid,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
267 if (r == NULL) {
268 r = (struct ctdb_rec_data *)&m->data[0];
269 } else {
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
273 if (reqid != NULL) {
274 *reqid = r->reqid;
277 if (key != NULL) {
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
281 if (data != NULL) {
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
292 return NULL;
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
297 return r;
303 * CTDB transaction destructor
305 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
307 tdb_transaction_cancel(h->ctx->wtdb->tdb);
308 return 0;
312 * start a transaction on a ctdb database:
313 * - lock the transaction lock key
314 * - start the tdb transaction
316 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
318 struct db_record *rh;
319 struct db_ctdb_rec *crec;
320 TDB_DATA key;
321 TALLOC_CTX *tmp_ctx;
322 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
323 int ret;
324 struct db_ctdb_ctx *ctx = h->ctx;
325 TDB_DATA data;
326 pid_t pid;
327 NTSTATUS status;
328 struct ctdb_ltdb_header header;
330 key.dptr = (uint8_t *)discard_const(keyname);
331 key.dsize = strlen(keyname);
333 again:
334 tmp_ctx = talloc_new(h);
336 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
337 if (rh == NULL) {
338 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
339 talloc_free(tmp_ctx);
340 return -1;
342 crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
345 * store the pid in the database:
346 * it is not enought that the node is dmaster...
348 pid = getpid();
349 data.dptr = (unsigned char *)&pid;
350 data.dsize = sizeof(pid_t);
351 status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
352 if (!NT_STATUS_IS_OK(status)) {
353 DEBUG(0, (__location__ " Failed to store pid in transaction "
354 "record: %s\n", nt_errstr(status)));
355 talloc_free(tmp_ctx);
356 return -1;
359 talloc_free(rh);
361 ret = tdb_transaction_start(ctx->wtdb->tdb);
362 if (ret != 0) {
363 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
364 talloc_free(tmp_ctx);
365 return -1;
368 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
369 if (!NT_STATUS_IS_OK(status) || header.dmaster != get_my_vnn()) {
370 tdb_transaction_cancel(ctx->wtdb->tdb);
371 talloc_free(tmp_ctx);
372 goto again;
375 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
376 tdb_transaction_cancel(ctx->wtdb->tdb);
377 talloc_free(tmp_ctx);
378 goto again;
381 talloc_free(tmp_ctx);
383 return 0;
388 * CTDB dbwrap API: transaction_start function
389 * starts a transaction on a persistent database
391 static int db_ctdb_transaction_start(struct db_context *db)
393 struct db_ctdb_transaction_handle *h;
394 int ret;
395 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
396 struct db_ctdb_ctx);
398 if (!db->persistent) {
399 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
400 ctx->db_id));
401 return -1;
404 if (ctx->transaction) {
405 ctx->transaction->nesting++;
406 return 0;
409 h = talloc_zero(db, struct db_ctdb_transaction_handle);
410 if (h == NULL) {
411 DEBUG(0,(__location__ " oom for transaction handle\n"));
412 return -1;
415 h->ctx = ctx;
417 ret = db_ctdb_transaction_fetch_start(h);
418 if (ret != 0) {
419 talloc_free(h);
420 return -1;
423 talloc_set_destructor(h, db_ctdb_transaction_destructor);
425 ctx->transaction = h;
427 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
429 return 0;
435 fetch a record inside a transaction
437 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
438 TALLOC_CTX *mem_ctx,
439 TDB_DATA key, TDB_DATA *data)
441 struct db_ctdb_transaction_handle *h = db->transaction;
442 NTSTATUS status;
444 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
446 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
447 *data = tdb_null;
448 } else if (!NT_STATUS_IS_OK(status)) {
449 return -1;
452 if (!h->in_replay) {
453 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
454 if (h->m_all == NULL) {
455 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
456 data->dsize = 0;
457 talloc_free(data->dptr);
458 return -1;
462 return 0;
466 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
467 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
469 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
470 TALLOC_CTX *mem_ctx,
471 TDB_DATA key)
473 struct db_record *result;
474 TDB_DATA ctdb_data;
476 if (!(result = talloc(mem_ctx, struct db_record))) {
477 DEBUG(0, ("talloc failed\n"));
478 return NULL;
481 result->private_data = ctx->transaction;
483 result->key.dsize = key.dsize;
484 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
485 if (result->key.dptr == NULL) {
486 DEBUG(0, ("talloc failed\n"));
487 TALLOC_FREE(result);
488 return NULL;
491 result->store = db_ctdb_store_transaction;
492 result->delete_rec = db_ctdb_delete_transaction;
494 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
495 if (ctdb_data.dptr == NULL) {
496 /* create the record */
497 result->value = tdb_null;
498 return result;
501 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
502 result->value.dptr = NULL;
504 if ((result->value.dsize != 0)
505 && !(result->value.dptr = (uint8 *)talloc_memdup(
506 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
507 result->value.dsize))) {
508 DEBUG(0, ("talloc failed\n"));
509 TALLOC_FREE(result);
512 SAFE_FREE(ctdb_data.dptr);
514 return result;
517 static int db_ctdb_record_destructor(struct db_record **recp)
519 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
520 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
521 rec->private_data, struct db_ctdb_transaction_handle);
522 int ret = h->ctx->db->transaction_commit(h->ctx->db);
523 if (ret != 0) {
524 DEBUG(0,(__location__ " transaction_commit failed\n"));
526 return 0;
530 auto-create a transaction for persistent databases
532 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
533 TALLOC_CTX *mem_ctx,
534 TDB_DATA key)
536 int res;
537 struct db_record *rec, **recp;
539 res = db_ctdb_transaction_start(ctx->db);
540 if (res == -1) {
541 return NULL;
544 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
545 if (rec == NULL) {
546 ctx->db->transaction_cancel(ctx->db);
547 return NULL;
550 /* destroy this transaction when we release the lock */
551 recp = talloc(rec, struct db_record *);
552 if (recp == NULL) {
553 ctx->db->transaction_cancel(ctx->db);
554 talloc_free(rec);
555 return NULL;
557 *recp = rec;
558 talloc_set_destructor(recp, db_ctdb_record_destructor);
559 return rec;
564 stores a record inside a transaction
566 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
567 TDB_DATA key, TDB_DATA data)
569 TALLOC_CTX *tmp_ctx = talloc_new(h);
570 int ret;
571 TDB_DATA rec;
572 struct ctdb_ltdb_header header;
573 NTSTATUS status;
575 /* we need the header so we can update the RSN */
576 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
577 if (rec.dptr == NULL) {
578 /* the record doesn't exist - create one with us as dmaster.
579 This is only safe because we are in a transaction and this
580 is a persistent database */
581 ZERO_STRUCT(header);
582 } else {
583 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
584 rec.dsize -= sizeof(struct ctdb_ltdb_header);
585 /* a special case, we are writing the same data that is there now */
586 if (data.dsize == rec.dsize &&
587 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
588 SAFE_FREE(rec.dptr);
589 talloc_free(tmp_ctx);
590 return 0;
592 SAFE_FREE(rec.dptr);
595 header.dmaster = get_my_vnn();
596 header.rsn++;
598 if (!h->in_replay) {
599 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
600 if (h->m_all == NULL) {
601 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
602 talloc_free(tmp_ctx);
603 return -1;
607 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
608 if (h->m_write == NULL) {
609 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
610 talloc_free(tmp_ctx);
611 return -1;
614 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
615 if (NT_STATUS_IS_OK(status)) {
616 ret = 0;
617 } else {
618 ret = -1;
621 talloc_free(tmp_ctx);
623 return ret;
628 a record store inside a transaction
630 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
632 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
633 rec->private_data, struct db_ctdb_transaction_handle);
634 int ret;
636 ret = db_ctdb_transaction_store(h, rec->key, data);
637 if (ret != 0) {
638 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
640 return NT_STATUS_OK;
644 a record delete inside a transaction
646 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
648 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
649 rec->private_data, struct db_ctdb_transaction_handle);
650 int ret;
652 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
653 if (ret != 0) {
654 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
656 return NT_STATUS_OK;
661 replay a transaction
663 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
665 int ret, i;
666 struct ctdb_rec_data *rec = NULL;
668 h->in_replay = true;
669 talloc_free(h->m_write);
670 h->m_write = NULL;
672 ret = db_ctdb_transaction_fetch_start(h);
673 if (ret != 0) {
674 return ret;
677 for (i=0;i<h->m_all->count;i++) {
678 TDB_DATA key, data;
680 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
681 if (rec == NULL) {
682 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
683 goto failed;
686 if (rec->reqid == 0) {
687 /* its a store */
688 if (db_ctdb_transaction_store(h, key, data) != 0) {
689 goto failed;
691 } else {
692 TDB_DATA data2;
693 TALLOC_CTX *tmp_ctx = talloc_new(h);
695 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
696 talloc_free(tmp_ctx);
697 goto failed;
699 if (data2.dsize != data.dsize ||
700 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
701 /* the record has changed on us - we have to give up */
702 talloc_free(tmp_ctx);
703 goto failed;
705 talloc_free(tmp_ctx);
709 return 0;
711 failed:
712 tdb_transaction_cancel(h->ctx->wtdb->tdb);
713 return -1;
718 commit a transaction
720 static int db_ctdb_transaction_commit(struct db_context *db)
722 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
723 struct db_ctdb_ctx);
724 NTSTATUS rets;
725 int ret;
726 int status;
727 int retries = 0;
728 struct db_ctdb_transaction_handle *h = ctx->transaction;
729 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
731 if (h == NULL) {
732 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
733 return -1;
736 if (h->nested_cancel) {
737 db->transaction_cancel(db);
738 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
739 return -1;
742 if (h->nesting != 0) {
743 h->nesting--;
744 return 0;
747 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
749 talloc_set_destructor(h, NULL);
751 /* our commit strategy is quite complex.
753 - we first try to commit the changes to all other nodes
755 - if that works, then we commit locally and we are done
757 - if a commit on another node fails, then we need to cancel
758 the transaction, then restart the transaction (thus
759 opening a window of time for a pending recovery to
760 complete), then replay the transaction, checking all the
761 reads and writes (checking that reads give the same data,
762 and writes succeed). Then we retry the transaction to the
763 other nodes
766 again:
767 if (h->m_write == NULL) {
768 /* no changes were made, potentially after a retry */
769 tdb_transaction_cancel(h->ctx->wtdb->tdb);
770 talloc_free(h);
771 ctx->transaction = NULL;
772 return 0;
775 /* tell ctdbd to commit to the other nodes */
776 rets = ctdbd_control_local(messaging_ctdbd_connection(),
777 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
778 h->ctx->db_id, 0,
779 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
780 if (!NT_STATUS_IS_OK(rets) || status != 0) {
781 tdb_transaction_cancel(h->ctx->wtdb->tdb);
782 sleep(1);
784 if (!NT_STATUS_IS_OK(rets)) {
785 failure_control = CTDB_CONTROL_TRANS2_ERROR;
786 } else {
787 /* work out what error code we will give if we
788 have to fail the operation */
789 switch ((enum ctdb_trans2_commit_error)status) {
790 case CTDB_TRANS2_COMMIT_SUCCESS:
791 case CTDB_TRANS2_COMMIT_SOMEFAIL:
792 case CTDB_TRANS2_COMMIT_TIMEOUT:
793 failure_control = CTDB_CONTROL_TRANS2_ERROR;
794 break;
795 case CTDB_TRANS2_COMMIT_ALLFAIL:
796 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
797 break;
801 if (++retries == 5) {
802 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
803 h->ctx->db_id, retries, (unsigned)failure_control));
804 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
805 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
806 tdb_null, NULL, NULL, NULL);
807 h->ctx->transaction = NULL;
808 talloc_free(h);
809 ctx->transaction = NULL;
810 return -1;
813 if (ctdb_replay_transaction(h) != 0) {
814 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
815 (unsigned)failure_control));
816 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
817 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
818 tdb_null, NULL, NULL, NULL);
819 h->ctx->transaction = NULL;
820 talloc_free(h);
821 ctx->transaction = NULL;
822 return -1;
824 goto again;
825 } else {
826 failure_control = CTDB_CONTROL_TRANS2_ERROR;
829 /* do the real commit locally */
830 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
831 if (ret != 0) {
832 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
833 (unsigned)failure_control));
834 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
835 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
836 h->ctx->transaction = NULL;
837 talloc_free(h);
838 return ret;
841 /* tell ctdbd that we are finished with our local commit */
842 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
843 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
844 tdb_null, NULL, NULL, NULL);
845 h->ctx->transaction = NULL;
846 talloc_free(h);
847 return 0;
852 cancel a transaction
854 static int db_ctdb_transaction_cancel(struct db_context *db)
856 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
857 struct db_ctdb_ctx);
858 struct db_ctdb_transaction_handle *h = ctx->transaction;
860 if (h == NULL) {
861 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
862 return -1;
865 if (h->nesting != 0) {
866 h->nesting--;
867 h->nested_cancel = true;
868 return 0;
871 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
873 ctx->transaction = NULL;
874 talloc_free(h);
875 return 0;
879 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
881 struct db_ctdb_rec *crec = talloc_get_type_abort(
882 rec->private_data, struct db_ctdb_rec);
884 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
889 static NTSTATUS db_ctdb_delete(struct db_record *rec)
891 TDB_DATA data;
894 * We have to store the header with empty data. TODO: Fix the
895 * tdb-level cleanup
898 ZERO_STRUCT(data);
900 return db_ctdb_store(rec, data, 0);
904 static int db_ctdb_record_destr(struct db_record* data)
906 struct db_ctdb_rec *crec = talloc_get_type_abort(
907 data->private_data, struct db_ctdb_rec);
909 DEBUG(10, (DEBUGLEVEL > 10
910 ? "Unlocking db %u key %s\n"
911 : "Unlocking db %u key %.20s\n",
912 (int)crec->ctdb_ctx->db_id,
913 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
914 data->key.dsize)));
916 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
917 DEBUG(0, ("tdb_chainunlock failed\n"));
918 return -1;
921 return 0;
924 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
925 TALLOC_CTX *mem_ctx,
926 TDB_DATA key,
927 bool persistent)
929 struct db_record *result;
930 struct db_ctdb_rec *crec;
931 NTSTATUS status;
932 TDB_DATA ctdb_data;
933 int migrate_attempts = 0;
935 if (!(result = talloc(mem_ctx, struct db_record))) {
936 DEBUG(0, ("talloc failed\n"));
937 return NULL;
940 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
941 DEBUG(0, ("talloc failed\n"));
942 TALLOC_FREE(result);
943 return NULL;
946 result->private_data = (void *)crec;
947 crec->ctdb_ctx = ctx;
949 result->key.dsize = key.dsize;
950 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
951 if (result->key.dptr == NULL) {
952 DEBUG(0, ("talloc failed\n"));
953 TALLOC_FREE(result);
954 return NULL;
958 * Do a blocking lock on the record
960 again:
962 if (DEBUGLEVEL >= 10) {
963 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
964 DEBUG(10, (DEBUGLEVEL > 10
965 ? "Locking db %u key %s\n"
966 : "Locking db %u key %.20s\n",
967 (int)crec->ctdb_ctx->db_id, keystr));
968 TALLOC_FREE(keystr);
971 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
972 DEBUG(3, ("tdb_chainlock failed\n"));
973 TALLOC_FREE(result);
974 return NULL;
977 result->store = db_ctdb_store;
978 result->delete_rec = db_ctdb_delete;
979 talloc_set_destructor(result, db_ctdb_record_destr);
981 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
984 * See if we have a valid record and we are the dmaster. If so, we can
985 * take the shortcut and just return it.
988 if ((ctdb_data.dptr == NULL) ||
989 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
990 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
991 #if 0
992 || (random() % 2 != 0)
993 #endif
995 SAFE_FREE(ctdb_data.dptr);
996 tdb_chainunlock(ctx->wtdb->tdb, key);
997 talloc_set_destructor(result, NULL);
999 migrate_attempts += 1;
1001 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1002 ctdb_data.dptr, ctdb_data.dptr ?
1003 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1004 get_my_vnn()));
1006 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1007 if (!NT_STATUS_IS_OK(status)) {
1008 DEBUG(5, ("ctdb_migrate failed: %s\n",
1009 nt_errstr(status)));
1010 TALLOC_FREE(result);
1011 return NULL;
1013 /* now its migrated, try again */
1014 goto again;
1017 if (migrate_attempts > 10) {
1018 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1019 migrate_attempts));
1022 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1024 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1025 result->value.dptr = NULL;
1027 if ((result->value.dsize != 0)
1028 && !(result->value.dptr = (uint8 *)talloc_memdup(
1029 result, ctdb_data.dptr + sizeof(crec->header),
1030 result->value.dsize))) {
1031 DEBUG(0, ("talloc failed\n"));
1032 TALLOC_FREE(result);
1035 SAFE_FREE(ctdb_data.dptr);
1037 return result;
1040 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1041 TALLOC_CTX *mem_ctx,
1042 TDB_DATA key)
1044 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1045 struct db_ctdb_ctx);
1047 if (ctx->transaction != NULL) {
1048 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1051 if (db->persistent) {
1052 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1055 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1059 fetch (unlocked, no migration) operation on ctdb
1061 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1062 TDB_DATA key, TDB_DATA *data)
1064 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1065 struct db_ctdb_ctx);
1066 NTSTATUS status;
1067 TDB_DATA ctdb_data;
1069 if (ctx->transaction) {
1070 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1073 /* try a direct fetch */
1074 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1077 * See if we have a valid record and we are the dmaster. If so, we can
1078 * take the shortcut and just return it.
1079 * we bypass the dmaster check for persistent databases
1081 if ((ctdb_data.dptr != NULL) &&
1082 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1083 (db->persistent ||
1084 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1085 /* we are the dmaster - avoid the ctdb protocol op */
1087 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1088 if (data->dsize == 0) {
1089 SAFE_FREE(ctdb_data.dptr);
1090 data->dptr = NULL;
1091 return 0;
1094 data->dptr = (uint8 *)talloc_memdup(
1095 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1096 data->dsize);
1098 SAFE_FREE(ctdb_data.dptr);
1100 if (data->dptr == NULL) {
1101 return -1;
1103 return 0;
1106 SAFE_FREE(ctdb_data.dptr);
1108 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1109 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1110 if (!NT_STATUS_IS_OK(status)) {
1111 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1112 return -1;
1115 return 0;
1118 struct traverse_state {
1119 struct db_context *db;
1120 int (*fn)(struct db_record *rec, void *private_data);
1121 void *private_data;
1124 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1126 struct traverse_state *state = (struct traverse_state *)private_data;
1127 struct db_record *rec;
1128 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1129 /* we have to give them a locked record to prevent races */
1130 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1131 if (rec && rec->value.dsize > 0) {
1132 state->fn(rec, state->private_data);
1134 talloc_free(tmp_ctx);
1137 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1138 void *private_data)
1140 struct traverse_state *state = (struct traverse_state *)private_data;
1141 struct db_record *rec;
1142 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1143 int ret = 0;
1144 /* we have to give them a locked record to prevent races */
1145 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1146 if (rec && rec->value.dsize > 0) {
1147 ret = state->fn(rec, state->private_data);
1149 talloc_free(tmp_ctx);
1150 return ret;
1153 static int db_ctdb_traverse(struct db_context *db,
1154 int (*fn)(struct db_record *rec,
1155 void *private_data),
1156 void *private_data)
1158 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1159 struct db_ctdb_ctx);
1160 struct traverse_state state;
1162 state.db = db;
1163 state.fn = fn;
1164 state.private_data = private_data;
1166 if (db->persistent) {
1167 /* for persistent databases we don't need to do a ctdb traverse,
1168 we can do a faster local traverse */
1169 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1173 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1174 return 0;
1177 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1179 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1182 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1184 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1187 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1189 struct traverse_state *state = (struct traverse_state *)private_data;
1190 struct db_record rec;
1191 rec.key = key;
1192 rec.value = data;
1193 rec.store = db_ctdb_store_deny;
1194 rec.delete_rec = db_ctdb_delete_deny;
1195 rec.private_data = state->db;
1196 state->fn(&rec, state->private_data);
1199 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1200 void *private_data)
1202 struct traverse_state *state = (struct traverse_state *)private_data;
1203 struct db_record rec;
1204 rec.key = kbuf;
1205 rec.value = dbuf;
1206 rec.store = db_ctdb_store_deny;
1207 rec.delete_rec = db_ctdb_delete_deny;
1208 rec.private_data = state->db;
1210 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1211 /* a deleted record */
1212 return 0;
1214 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1215 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1217 return state->fn(&rec, state->private_data);
1220 static int db_ctdb_traverse_read(struct db_context *db,
1221 int (*fn)(struct db_record *rec,
1222 void *private_data),
1223 void *private_data)
1225 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1226 struct db_ctdb_ctx);
1227 struct traverse_state state;
1229 state.db = db;
1230 state.fn = fn;
1231 state.private_data = private_data;
1233 if (db->persistent) {
1234 /* for persistent databases we don't need to do a ctdb traverse,
1235 we can do a faster local traverse */
1236 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1239 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1240 return 0;
1243 static int db_ctdb_get_seqnum(struct db_context *db)
1245 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1246 struct db_ctdb_ctx);
1247 return tdb_get_seqnum(ctx->wtdb->tdb);
1250 static int db_ctdb_get_flags(struct db_context *db)
1252 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1253 struct db_ctdb_ctx);
1254 return tdb_get_flags(ctx->wtdb->tdb);
1257 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1258 const char *name,
1259 int hash_size, int tdb_flags,
1260 int open_flags, mode_t mode)
1262 struct db_context *result;
1263 struct db_ctdb_ctx *db_ctdb;
1264 char *db_path;
1266 if (!lp_clustering()) {
1267 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1268 return NULL;
1271 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1272 DEBUG(0, ("talloc failed\n"));
1273 TALLOC_FREE(result);
1274 return NULL;
1277 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1278 DEBUG(0, ("talloc failed\n"));
1279 TALLOC_FREE(result);
1280 return NULL;
1283 db_ctdb->transaction = NULL;
1284 db_ctdb->db = result;
1286 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1287 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1288 TALLOC_FREE(result);
1289 return NULL;
1292 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1294 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1296 /* only pass through specific flags */
1297 tdb_flags &= TDB_SEQNUM;
1299 /* honor permissions if user has specified O_CREAT */
1300 if (open_flags & O_CREAT) {
1301 chmod(db_path, mode);
1304 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1305 if (db_ctdb->wtdb == NULL) {
1306 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1307 TALLOC_FREE(result);
1308 return NULL;
1310 talloc_free(db_path);
1312 result->private_data = (void *)db_ctdb;
1313 result->fetch_locked = db_ctdb_fetch_locked;
1314 result->fetch = db_ctdb_fetch;
1315 result->traverse = db_ctdb_traverse;
1316 result->traverse_read = db_ctdb_traverse_read;
1317 result->get_seqnum = db_ctdb_get_seqnum;
1318 result->get_flags = db_ctdb_get_flags;
1319 result->transaction_start = db_ctdb_transaction_start;
1320 result->transaction_commit = db_ctdb_transaction_commit;
1321 result->transaction_cancel = db_ctdb_transaction_cancel;
1323 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1324 name, db_ctdb->db_id));
1326 return result;
1328 #endif