s3:dbwrap_ctdb: add new db_ctdb_transaction_active() that calls CTDB_CONTROL_TRANS2_C...
[Samba/bb.git] / source3 / lib / dbwrap_ctdb.c
blobec52d766c0604f5379e72fa41822bbcfc2c89be8
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
40 struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
47 struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
74 return status;
78 /**
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83 TDB_DATA key,
84 struct ctdb_ltdb_header *header,
85 TALLOC_CTX *mem_ctx,
86 TDB_DATA *data)
88 TDB_DATA rec;
89 NTSTATUS status;
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
94 if (data) {
95 ZERO_STRUCTP(data);
97 if (header) {
98 header->dmaster = (uint32_t)-1;
99 header->rsn = 0;
101 goto done;
104 if (header) {
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
108 if (data) {
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
111 data->dptr = NULL;
112 } else {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114 rec.dptr
115 + sizeof(struct ctdb_ltdb_header),
116 data->dsize);
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
119 goto done;
124 status = NT_STATUS_OK;
126 done:
127 SAFE_FREE(rec.dptr);
128 return status;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136 TDB_DATA key,
137 struct ctdb_ltdb_header *header,
138 TDB_DATA data)
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
141 TDB_DATA rec;
142 int ret;
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
157 talloc_free(tmp_ctx);
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
168 of the record
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
171 TDB_DATA key,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA data)
175 size_t length;
176 struct ctdb_rec_data *d;
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181 if (d == NULL) {
182 return NULL;
184 d->length = length;
185 d->reqid = reqid;
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
188 if (header) {
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192 } else {
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
196 return d;
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
203 uint64_t db_id,
204 uint32_t reqid,
205 TDB_DATA key,
206 struct ctdb_ltdb_header *header,
207 TDB_DATA data)
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214 if (r == NULL) {
215 talloc_free(m);
216 return NULL;
219 if (m == NULL) {
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222 if (m == NULL) {
223 goto done;
225 m->db_id = db_id;
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
233 if (m2 == NULL) {
234 talloc_free(m);
235 goto done;
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
240 m2->count++;
242 done:
243 talloc_free(r);
244 return m2;
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
250 TDB_DATA data;
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
253 return data;
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263 uint32_t *reqid,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
267 if (r == NULL) {
268 r = (struct ctdb_rec_data *)&m->data[0];
269 } else {
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
273 if (reqid != NULL) {
274 *reqid = r->reqid;
277 if (key != NULL) {
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
281 if (data != NULL) {
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
292 return NULL;
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
297 return r;
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
303 int32_t status;
304 NTSTATUS ret;
305 TDB_DATA indata;
307 indata.dptr = (uint8_t *)&db_id;
308 indata.dsize = sizeof(db_id);
310 ret = ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312 indata, NULL, NULL, &status);
314 if (!NT_STATUS_IS_OK(ret)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316 return -1;
319 return status;
324 * CTDB transaction destructor
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
328 tdb_transaction_cancel(h->ctx->wtdb->tdb);
329 return 0;
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
339 struct db_record *rh;
340 struct db_ctdb_rec *crec;
341 TDB_DATA key;
342 TALLOC_CTX *tmp_ctx;
343 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344 int ret;
345 struct db_ctdb_ctx *ctx = h->ctx;
346 TDB_DATA data;
347 pid_t pid;
348 NTSTATUS status;
349 struct ctdb_ltdb_header header;
351 key.dptr = (uint8_t *)discard_const(keyname);
352 key.dsize = strlen(keyname);
354 again:
355 tmp_ctx = talloc_new(h);
357 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
358 if (rh == NULL) {
359 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
360 talloc_free(tmp_ctx);
361 return -1;
363 crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
366 * store the pid in the database:
367 * it is not enought that the node is dmaster...
369 pid = getpid();
370 data.dptr = (unsigned char *)&pid;
371 data.dsize = sizeof(pid_t);
372 status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
373 if (!NT_STATUS_IS_OK(status)) {
374 DEBUG(0, (__location__ " Failed to store pid in transaction "
375 "record: %s\n", nt_errstr(status)));
376 talloc_free(tmp_ctx);
377 return -1;
380 talloc_free(rh);
382 ret = tdb_transaction_start(ctx->wtdb->tdb);
383 if (ret != 0) {
384 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
385 talloc_free(tmp_ctx);
386 return -1;
389 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
390 if (!NT_STATUS_IS_OK(status) || header.dmaster != get_my_vnn()) {
391 tdb_transaction_cancel(ctx->wtdb->tdb);
392 talloc_free(tmp_ctx);
393 goto again;
396 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
397 tdb_transaction_cancel(ctx->wtdb->tdb);
398 talloc_free(tmp_ctx);
399 goto again;
402 talloc_free(tmp_ctx);
404 return 0;
409 * CTDB dbwrap API: transaction_start function
410 * starts a transaction on a persistent database
412 static int db_ctdb_transaction_start(struct db_context *db)
414 struct db_ctdb_transaction_handle *h;
415 int ret;
416 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
417 struct db_ctdb_ctx);
419 if (!db->persistent) {
420 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
421 ctx->db_id));
422 return -1;
425 if (ctx->transaction) {
426 ctx->transaction->nesting++;
427 return 0;
430 h = talloc_zero(db, struct db_ctdb_transaction_handle);
431 if (h == NULL) {
432 DEBUG(0,(__location__ " oom for transaction handle\n"));
433 return -1;
436 h->ctx = ctx;
438 ret = db_ctdb_transaction_fetch_start(h);
439 if (ret != 0) {
440 talloc_free(h);
441 return -1;
444 talloc_set_destructor(h, db_ctdb_transaction_destructor);
446 ctx->transaction = h;
448 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
450 return 0;
456 fetch a record inside a transaction
458 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
459 TALLOC_CTX *mem_ctx,
460 TDB_DATA key, TDB_DATA *data)
462 struct db_ctdb_transaction_handle *h = db->transaction;
463 NTSTATUS status;
465 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
467 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
468 *data = tdb_null;
469 } else if (!NT_STATUS_IS_OK(status)) {
470 return -1;
473 if (!h->in_replay) {
474 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
475 if (h->m_all == NULL) {
476 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
477 data->dsize = 0;
478 talloc_free(data->dptr);
479 return -1;
483 return 0;
487 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
488 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
490 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
491 TALLOC_CTX *mem_ctx,
492 TDB_DATA key)
494 struct db_record *result;
495 TDB_DATA ctdb_data;
497 if (!(result = talloc(mem_ctx, struct db_record))) {
498 DEBUG(0, ("talloc failed\n"));
499 return NULL;
502 result->private_data = ctx->transaction;
504 result->key.dsize = key.dsize;
505 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
506 if (result->key.dptr == NULL) {
507 DEBUG(0, ("talloc failed\n"));
508 TALLOC_FREE(result);
509 return NULL;
512 result->store = db_ctdb_store_transaction;
513 result->delete_rec = db_ctdb_delete_transaction;
515 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
516 if (ctdb_data.dptr == NULL) {
517 /* create the record */
518 result->value = tdb_null;
519 return result;
522 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
523 result->value.dptr = NULL;
525 if ((result->value.dsize != 0)
526 && !(result->value.dptr = (uint8 *)talloc_memdup(
527 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
528 result->value.dsize))) {
529 DEBUG(0, ("talloc failed\n"));
530 TALLOC_FREE(result);
533 SAFE_FREE(ctdb_data.dptr);
535 return result;
538 static int db_ctdb_record_destructor(struct db_record **recp)
540 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
541 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
542 rec->private_data, struct db_ctdb_transaction_handle);
543 int ret = h->ctx->db->transaction_commit(h->ctx->db);
544 if (ret != 0) {
545 DEBUG(0,(__location__ " transaction_commit failed\n"));
547 return 0;
551 auto-create a transaction for persistent databases
553 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
554 TALLOC_CTX *mem_ctx,
555 TDB_DATA key)
557 int res;
558 struct db_record *rec, **recp;
560 res = db_ctdb_transaction_start(ctx->db);
561 if (res == -1) {
562 return NULL;
565 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
566 if (rec == NULL) {
567 ctx->db->transaction_cancel(ctx->db);
568 return NULL;
571 /* destroy this transaction when we release the lock */
572 recp = talloc(rec, struct db_record *);
573 if (recp == NULL) {
574 ctx->db->transaction_cancel(ctx->db);
575 talloc_free(rec);
576 return NULL;
578 *recp = rec;
579 talloc_set_destructor(recp, db_ctdb_record_destructor);
580 return rec;
585 stores a record inside a transaction
587 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
588 TDB_DATA key, TDB_DATA data)
590 TALLOC_CTX *tmp_ctx = talloc_new(h);
591 int ret;
592 TDB_DATA rec;
593 struct ctdb_ltdb_header header;
594 NTSTATUS status;
596 /* we need the header so we can update the RSN */
597 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
598 if (rec.dptr == NULL) {
599 /* the record doesn't exist - create one with us as dmaster.
600 This is only safe because we are in a transaction and this
601 is a persistent database */
602 ZERO_STRUCT(header);
603 } else {
604 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
605 rec.dsize -= sizeof(struct ctdb_ltdb_header);
606 /* a special case, we are writing the same data that is there now */
607 if (data.dsize == rec.dsize &&
608 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
609 SAFE_FREE(rec.dptr);
610 talloc_free(tmp_ctx);
611 return 0;
613 SAFE_FREE(rec.dptr);
616 header.dmaster = get_my_vnn();
617 header.rsn++;
619 if (!h->in_replay) {
620 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
621 if (h->m_all == NULL) {
622 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
623 talloc_free(tmp_ctx);
624 return -1;
628 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
629 if (h->m_write == NULL) {
630 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
631 talloc_free(tmp_ctx);
632 return -1;
635 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
636 if (NT_STATUS_IS_OK(status)) {
637 ret = 0;
638 } else {
639 ret = -1;
642 talloc_free(tmp_ctx);
644 return ret;
649 a record store inside a transaction
651 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
653 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
654 rec->private_data, struct db_ctdb_transaction_handle);
655 int ret;
657 ret = db_ctdb_transaction_store(h, rec->key, data);
658 if (ret != 0) {
659 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
661 return NT_STATUS_OK;
665 a record delete inside a transaction
667 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
669 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
670 rec->private_data, struct db_ctdb_transaction_handle);
671 int ret;
673 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
674 if (ret != 0) {
675 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
677 return NT_STATUS_OK;
682 replay a transaction
684 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
686 int ret, i;
687 struct ctdb_rec_data *rec = NULL;
689 h->in_replay = true;
690 talloc_free(h->m_write);
691 h->m_write = NULL;
693 ret = db_ctdb_transaction_fetch_start(h);
694 if (ret != 0) {
695 return ret;
698 for (i=0;i<h->m_all->count;i++) {
699 TDB_DATA key, data;
701 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
702 if (rec == NULL) {
703 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
704 goto failed;
707 if (rec->reqid == 0) {
708 /* its a store */
709 if (db_ctdb_transaction_store(h, key, data) != 0) {
710 goto failed;
712 } else {
713 TDB_DATA data2;
714 TALLOC_CTX *tmp_ctx = talloc_new(h);
716 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
717 talloc_free(tmp_ctx);
718 goto failed;
720 if (data2.dsize != data.dsize ||
721 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
722 /* the record has changed on us - we have to give up */
723 talloc_free(tmp_ctx);
724 goto failed;
726 talloc_free(tmp_ctx);
730 return 0;
732 failed:
733 tdb_transaction_cancel(h->ctx->wtdb->tdb);
734 return -1;
739 commit a transaction
741 static int db_ctdb_transaction_commit(struct db_context *db)
743 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
744 struct db_ctdb_ctx);
745 NTSTATUS rets;
746 int ret;
747 int status;
748 int retries = 0;
749 struct db_ctdb_transaction_handle *h = ctx->transaction;
750 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
752 if (h == NULL) {
753 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
754 return -1;
757 if (h->nested_cancel) {
758 db->transaction_cancel(db);
759 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
760 return -1;
763 if (h->nesting != 0) {
764 h->nesting--;
765 return 0;
768 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
770 talloc_set_destructor(h, NULL);
772 /* our commit strategy is quite complex.
774 - we first try to commit the changes to all other nodes
776 - if that works, then we commit locally and we are done
778 - if a commit on another node fails, then we need to cancel
779 the transaction, then restart the transaction (thus
780 opening a window of time for a pending recovery to
781 complete), then replay the transaction, checking all the
782 reads and writes (checking that reads give the same data,
783 and writes succeed). Then we retry the transaction to the
784 other nodes
787 again:
788 if (h->m_write == NULL) {
789 /* no changes were made, potentially after a retry */
790 tdb_transaction_cancel(h->ctx->wtdb->tdb);
791 talloc_free(h);
792 ctx->transaction = NULL;
793 return 0;
796 /* tell ctdbd to commit to the other nodes */
797 rets = ctdbd_control_local(messaging_ctdbd_connection(),
798 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
799 h->ctx->db_id, 0,
800 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
801 if (!NT_STATUS_IS_OK(rets) || status != 0) {
802 tdb_transaction_cancel(h->ctx->wtdb->tdb);
803 sleep(1);
805 if (!NT_STATUS_IS_OK(rets)) {
806 failure_control = CTDB_CONTROL_TRANS2_ERROR;
807 } else {
808 /* work out what error code we will give if we
809 have to fail the operation */
810 switch ((enum ctdb_trans2_commit_error)status) {
811 case CTDB_TRANS2_COMMIT_SUCCESS:
812 case CTDB_TRANS2_COMMIT_SOMEFAIL:
813 case CTDB_TRANS2_COMMIT_TIMEOUT:
814 failure_control = CTDB_CONTROL_TRANS2_ERROR;
815 break;
816 case CTDB_TRANS2_COMMIT_ALLFAIL:
817 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
818 break;
822 if (++retries == 5) {
823 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
824 h->ctx->db_id, retries, (unsigned)failure_control));
825 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
826 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
827 tdb_null, NULL, NULL, NULL);
828 h->ctx->transaction = NULL;
829 talloc_free(h);
830 ctx->transaction = NULL;
831 return -1;
834 if (ctdb_replay_transaction(h) != 0) {
835 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
836 (unsigned)failure_control));
837 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
838 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
839 tdb_null, NULL, NULL, NULL);
840 h->ctx->transaction = NULL;
841 talloc_free(h);
842 ctx->transaction = NULL;
843 return -1;
845 goto again;
846 } else {
847 failure_control = CTDB_CONTROL_TRANS2_ERROR;
850 /* do the real commit locally */
851 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
852 if (ret != 0) {
853 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
854 (unsigned)failure_control));
855 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
856 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
857 h->ctx->transaction = NULL;
858 talloc_free(h);
859 return ret;
862 /* tell ctdbd that we are finished with our local commit */
863 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
864 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
865 tdb_null, NULL, NULL, NULL);
866 h->ctx->transaction = NULL;
867 talloc_free(h);
868 return 0;
873 cancel a transaction
875 static int db_ctdb_transaction_cancel(struct db_context *db)
877 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
878 struct db_ctdb_ctx);
879 struct db_ctdb_transaction_handle *h = ctx->transaction;
881 if (h == NULL) {
882 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
883 return -1;
886 if (h->nesting != 0) {
887 h->nesting--;
888 h->nested_cancel = true;
889 return 0;
892 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
894 ctx->transaction = NULL;
895 talloc_free(h);
896 return 0;
900 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
902 struct db_ctdb_rec *crec = talloc_get_type_abort(
903 rec->private_data, struct db_ctdb_rec);
905 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
910 static NTSTATUS db_ctdb_delete(struct db_record *rec)
912 TDB_DATA data;
915 * We have to store the header with empty data. TODO: Fix the
916 * tdb-level cleanup
919 ZERO_STRUCT(data);
921 return db_ctdb_store(rec, data, 0);
925 static int db_ctdb_record_destr(struct db_record* data)
927 struct db_ctdb_rec *crec = talloc_get_type_abort(
928 data->private_data, struct db_ctdb_rec);
930 DEBUG(10, (DEBUGLEVEL > 10
931 ? "Unlocking db %u key %s\n"
932 : "Unlocking db %u key %.20s\n",
933 (int)crec->ctdb_ctx->db_id,
934 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
935 data->key.dsize)));
937 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
938 DEBUG(0, ("tdb_chainunlock failed\n"));
939 return -1;
942 return 0;
945 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
946 TALLOC_CTX *mem_ctx,
947 TDB_DATA key,
948 bool persistent)
950 struct db_record *result;
951 struct db_ctdb_rec *crec;
952 NTSTATUS status;
953 TDB_DATA ctdb_data;
954 int migrate_attempts = 0;
956 if (!(result = talloc(mem_ctx, struct db_record))) {
957 DEBUG(0, ("talloc failed\n"));
958 return NULL;
961 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
962 DEBUG(0, ("talloc failed\n"));
963 TALLOC_FREE(result);
964 return NULL;
967 result->private_data = (void *)crec;
968 crec->ctdb_ctx = ctx;
970 result->key.dsize = key.dsize;
971 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
972 if (result->key.dptr == NULL) {
973 DEBUG(0, ("talloc failed\n"));
974 TALLOC_FREE(result);
975 return NULL;
979 * Do a blocking lock on the record
981 again:
983 if (DEBUGLEVEL >= 10) {
984 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
985 DEBUG(10, (DEBUGLEVEL > 10
986 ? "Locking db %u key %s\n"
987 : "Locking db %u key %.20s\n",
988 (int)crec->ctdb_ctx->db_id, keystr));
989 TALLOC_FREE(keystr);
992 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
993 DEBUG(3, ("tdb_chainlock failed\n"));
994 TALLOC_FREE(result);
995 return NULL;
998 result->store = db_ctdb_store;
999 result->delete_rec = db_ctdb_delete;
1000 talloc_set_destructor(result, db_ctdb_record_destr);
1002 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1005 * See if we have a valid record and we are the dmaster. If so, we can
1006 * take the shortcut and just return it.
1009 if ((ctdb_data.dptr == NULL) ||
1010 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1011 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1012 #if 0
1013 || (random() % 2 != 0)
1014 #endif
1016 SAFE_FREE(ctdb_data.dptr);
1017 tdb_chainunlock(ctx->wtdb->tdb, key);
1018 talloc_set_destructor(result, NULL);
1020 migrate_attempts += 1;
1022 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1023 ctdb_data.dptr, ctdb_data.dptr ?
1024 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1025 get_my_vnn()));
1027 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1028 if (!NT_STATUS_IS_OK(status)) {
1029 DEBUG(5, ("ctdb_migrate failed: %s\n",
1030 nt_errstr(status)));
1031 TALLOC_FREE(result);
1032 return NULL;
1034 /* now its migrated, try again */
1035 goto again;
1038 if (migrate_attempts > 10) {
1039 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1040 migrate_attempts));
1043 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1045 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1046 result->value.dptr = NULL;
1048 if ((result->value.dsize != 0)
1049 && !(result->value.dptr = (uint8 *)talloc_memdup(
1050 result, ctdb_data.dptr + sizeof(crec->header),
1051 result->value.dsize))) {
1052 DEBUG(0, ("talloc failed\n"));
1053 TALLOC_FREE(result);
1056 SAFE_FREE(ctdb_data.dptr);
1058 return result;
1061 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1062 TALLOC_CTX *mem_ctx,
1063 TDB_DATA key)
1065 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1066 struct db_ctdb_ctx);
1068 if (ctx->transaction != NULL) {
1069 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1072 if (db->persistent) {
1073 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1076 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1080 fetch (unlocked, no migration) operation on ctdb
1082 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1083 TDB_DATA key, TDB_DATA *data)
1085 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1086 struct db_ctdb_ctx);
1087 NTSTATUS status;
1088 TDB_DATA ctdb_data;
1090 if (ctx->transaction) {
1091 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1094 /* try a direct fetch */
1095 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1098 * See if we have a valid record and we are the dmaster. If so, we can
1099 * take the shortcut and just return it.
1100 * we bypass the dmaster check for persistent databases
1102 if ((ctdb_data.dptr != NULL) &&
1103 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1104 (db->persistent ||
1105 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1106 /* we are the dmaster - avoid the ctdb protocol op */
1108 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1109 if (data->dsize == 0) {
1110 SAFE_FREE(ctdb_data.dptr);
1111 data->dptr = NULL;
1112 return 0;
1115 data->dptr = (uint8 *)talloc_memdup(
1116 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1117 data->dsize);
1119 SAFE_FREE(ctdb_data.dptr);
1121 if (data->dptr == NULL) {
1122 return -1;
1124 return 0;
1127 SAFE_FREE(ctdb_data.dptr);
1129 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1130 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1131 if (!NT_STATUS_IS_OK(status)) {
1132 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1133 return -1;
1136 return 0;
1139 struct traverse_state {
1140 struct db_context *db;
1141 int (*fn)(struct db_record *rec, void *private_data);
1142 void *private_data;
1145 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1147 struct traverse_state *state = (struct traverse_state *)private_data;
1148 struct db_record *rec;
1149 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1150 /* we have to give them a locked record to prevent races */
1151 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1152 if (rec && rec->value.dsize > 0) {
1153 state->fn(rec, state->private_data);
1155 talloc_free(tmp_ctx);
1158 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1159 void *private_data)
1161 struct traverse_state *state = (struct traverse_state *)private_data;
1162 struct db_record *rec;
1163 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1164 int ret = 0;
1165 /* we have to give them a locked record to prevent races */
1166 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1167 if (rec && rec->value.dsize > 0) {
1168 ret = state->fn(rec, state->private_data);
1170 talloc_free(tmp_ctx);
1171 return ret;
1174 static int db_ctdb_traverse(struct db_context *db,
1175 int (*fn)(struct db_record *rec,
1176 void *private_data),
1177 void *private_data)
1179 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1180 struct db_ctdb_ctx);
1181 struct traverse_state state;
1183 state.db = db;
1184 state.fn = fn;
1185 state.private_data = private_data;
1187 if (db->persistent) {
1188 /* for persistent databases we don't need to do a ctdb traverse,
1189 we can do a faster local traverse */
1190 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1194 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1195 return 0;
1198 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1200 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1203 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1205 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1208 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1210 struct traverse_state *state = (struct traverse_state *)private_data;
1211 struct db_record rec;
1212 rec.key = key;
1213 rec.value = data;
1214 rec.store = db_ctdb_store_deny;
1215 rec.delete_rec = db_ctdb_delete_deny;
1216 rec.private_data = state->db;
1217 state->fn(&rec, state->private_data);
1220 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1221 void *private_data)
1223 struct traverse_state *state = (struct traverse_state *)private_data;
1224 struct db_record rec;
1225 rec.key = kbuf;
1226 rec.value = dbuf;
1227 rec.store = db_ctdb_store_deny;
1228 rec.delete_rec = db_ctdb_delete_deny;
1229 rec.private_data = state->db;
1231 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1232 /* a deleted record */
1233 return 0;
1235 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1236 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1238 return state->fn(&rec, state->private_data);
1241 static int db_ctdb_traverse_read(struct db_context *db,
1242 int (*fn)(struct db_record *rec,
1243 void *private_data),
1244 void *private_data)
1246 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1247 struct db_ctdb_ctx);
1248 struct traverse_state state;
1250 state.db = db;
1251 state.fn = fn;
1252 state.private_data = private_data;
1254 if (db->persistent) {
1255 /* for persistent databases we don't need to do a ctdb traverse,
1256 we can do a faster local traverse */
1257 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1260 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1261 return 0;
1264 static int db_ctdb_get_seqnum(struct db_context *db)
1266 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1267 struct db_ctdb_ctx);
1268 return tdb_get_seqnum(ctx->wtdb->tdb);
1271 static int db_ctdb_get_flags(struct db_context *db)
1273 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1274 struct db_ctdb_ctx);
1275 return tdb_get_flags(ctx->wtdb->tdb);
1278 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1279 const char *name,
1280 int hash_size, int tdb_flags,
1281 int open_flags, mode_t mode)
1283 struct db_context *result;
1284 struct db_ctdb_ctx *db_ctdb;
1285 char *db_path;
1287 if (!lp_clustering()) {
1288 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1289 return NULL;
1292 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1293 DEBUG(0, ("talloc failed\n"));
1294 TALLOC_FREE(result);
1295 return NULL;
1298 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1299 DEBUG(0, ("talloc failed\n"));
1300 TALLOC_FREE(result);
1301 return NULL;
1304 db_ctdb->transaction = NULL;
1305 db_ctdb->db = result;
1307 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1308 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1309 TALLOC_FREE(result);
1310 return NULL;
1313 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1315 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1317 /* only pass through specific flags */
1318 tdb_flags &= TDB_SEQNUM;
1320 /* honor permissions if user has specified O_CREAT */
1321 if (open_flags & O_CREAT) {
1322 chmod(db_path, mode);
1325 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1326 if (db_ctdb->wtdb == NULL) {
1327 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1328 TALLOC_FREE(result);
1329 return NULL;
1331 talloc_free(db_path);
1333 result->private_data = (void *)db_ctdb;
1334 result->fetch_locked = db_ctdb_fetch_locked;
1335 result->fetch = db_ctdb_fetch;
1336 result->traverse = db_ctdb_traverse;
1337 result->traverse_read = db_ctdb_traverse_read;
1338 result->get_seqnum = db_ctdb_get_seqnum;
1339 result->get_flags = db_ctdb_get_flags;
1340 result->transaction_start = db_ctdb_transaction_start;
1341 result->transaction_commit = db_ctdb_transaction_commit;
1342 result->transaction_cancel = db_ctdb_transaction_cancel;
1344 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1345 name, db_ctdb->db_id));
1347 return result;
1349 #endif