WHATSNEW: Update changes.
[Samba/gbeck.git] / source3 / lib / dbwrap_ctdb.c
blob8e188d0ab53a734106effb0dbbf1656627dade85
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
40 struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
47 struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
74 return status;
78 /**
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83 TDB_DATA key,
84 struct ctdb_ltdb_header *header,
85 TALLOC_CTX *mem_ctx,
86 TDB_DATA *data)
88 TDB_DATA rec;
89 NTSTATUS status;
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
94 if (data) {
95 ZERO_STRUCTP(data);
97 if (header) {
98 header->dmaster = (uint32_t)-1;
99 header->rsn = 0;
101 goto done;
104 if (header) {
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
108 if (data) {
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
111 data->dptr = NULL;
112 } else {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114 rec.dptr
115 + sizeof(struct ctdb_ltdb_header),
116 data->dsize);
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
119 goto done;
124 status = NT_STATUS_OK;
126 done:
127 SAFE_FREE(rec.dptr);
128 return status;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136 TDB_DATA key,
137 struct ctdb_ltdb_header *header,
138 TDB_DATA data)
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
141 TDB_DATA rec;
142 int ret;
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
157 talloc_free(tmp_ctx);
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
168 of the record
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
171 TDB_DATA key,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA data)
175 size_t length;
176 struct ctdb_rec_data *d;
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181 if (d == NULL) {
182 return NULL;
184 d->length = length;
185 d->reqid = reqid;
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
188 if (header) {
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192 } else {
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
196 return d;
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
203 uint64_t db_id,
204 uint32_t reqid,
205 TDB_DATA key,
206 struct ctdb_ltdb_header *header,
207 TDB_DATA data)
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214 if (r == NULL) {
215 talloc_free(m);
216 return NULL;
219 if (m == NULL) {
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222 if (m == NULL) {
223 goto done;
225 m->db_id = db_id;
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
233 if (m2 == NULL) {
234 talloc_free(m);
235 goto done;
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
240 m2->count++;
242 done:
243 talloc_free(r);
244 return m2;
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
250 TDB_DATA data;
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
253 return data;
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263 uint32_t *reqid,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
267 if (r == NULL) {
268 r = (struct ctdb_rec_data *)&m->data[0];
269 } else {
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
273 if (reqid != NULL) {
274 *reqid = r->reqid;
277 if (key != NULL) {
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
281 if (data != NULL) {
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
292 return NULL;
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
297 return r;
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
303 int32_t status;
304 NTSTATUS ret;
305 TDB_DATA indata;
307 indata.dptr = (uint8_t *)&db_id;
308 indata.dsize = sizeof(db_id);
310 ret = ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312 indata, NULL, NULL, &status);
314 if (!NT_STATUS_IS_OK(ret)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316 return -1;
319 return status;
324 * CTDB transaction destructor
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
328 tdb_transaction_cancel(h->ctx->wtdb->tdb);
329 return 0;
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
339 struct db_record *rh;
340 struct db_ctdb_rec *crec;
341 TDB_DATA key;
342 TALLOC_CTX *tmp_ctx;
343 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344 int ret;
345 struct db_ctdb_ctx *ctx = h->ctx;
346 TDB_DATA data;
347 pid_t pid;
348 NTSTATUS status;
349 struct ctdb_ltdb_header header;
350 int32_t transaction_status;
352 key.dptr = (uint8_t *)discard_const(keyname);
353 key.dsize = strlen(keyname);
355 again:
356 tmp_ctx = talloc_new(h);
358 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
359 if (rh == NULL) {
360 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
361 talloc_free(tmp_ctx);
362 return -1;
364 crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
366 transaction_status = db_ctdb_transaction_active(ctx->db_id);
367 if (transaction_status == 1) {
368 unsigned long int usec = (1000 + random()) % 100000;
369 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370 "Re-trying after %lu microseconds...",
371 ctx->db_id, usec));
372 talloc_free(tmp_ctx);
373 usleep(usec);
374 goto again;
378 * store the pid in the database:
379 * it is not enought that the node is dmaster...
381 pid = getpid();
382 data.dptr = (unsigned char *)&pid;
383 data.dsize = sizeof(pid_t);
384 crec->header.rsn++;
385 crec->header.dmaster = get_my_vnn();
386 status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
387 if (!NT_STATUS_IS_OK(status)) {
388 DEBUG(0, (__location__ " Failed to store pid in transaction "
389 "record: %s\n", nt_errstr(status)));
390 talloc_free(tmp_ctx);
391 return -1;
394 talloc_free(rh);
396 ret = tdb_transaction_start(ctx->wtdb->tdb);
397 if (ret != 0) {
398 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
399 talloc_free(tmp_ctx);
400 return -1;
403 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
404 if (!NT_STATUS_IS_OK(status)) {
405 DEBUG(0, (__location__ " failed to refetch transaction lock "
406 "record inside transaction: %s - retrying\n",
407 nt_errstr(status)));
408 tdb_transaction_cancel(ctx->wtdb->tdb);
409 talloc_free(tmp_ctx);
410 goto again;
413 if (header.dmaster != get_my_vnn()) {
414 DEBUG(3, (__location__ " refetch transaction lock record : "
415 "we are not dmaster any more "
416 "(dmaster[%u] != my_vnn[%u]) - retrying\n",
417 header.dmaster, get_my_vnn()));
418 tdb_transaction_cancel(ctx->wtdb->tdb);
419 talloc_free(tmp_ctx);
420 goto again;
423 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
424 DEBUG(3, (__location__ " refetch transaction lock record: "
425 "another local process has started a transaction "
426 "(stored pid [%u] != my pid [%u]) - retrying\n",
427 *(pid_t *)(data.dptr), pid));
428 tdb_transaction_cancel(ctx->wtdb->tdb);
429 talloc_free(tmp_ctx);
430 goto again;
433 talloc_free(tmp_ctx);
435 return 0;
440 * CTDB dbwrap API: transaction_start function
441 * starts a transaction on a persistent database
443 static int db_ctdb_transaction_start(struct db_context *db)
445 struct db_ctdb_transaction_handle *h;
446 int ret;
447 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
448 struct db_ctdb_ctx);
450 if (!db->persistent) {
451 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
452 ctx->db_id));
453 return -1;
456 if (ctx->transaction) {
457 ctx->transaction->nesting++;
458 return 0;
461 h = talloc_zero(db, struct db_ctdb_transaction_handle);
462 if (h == NULL) {
463 DEBUG(0,(__location__ " oom for transaction handle\n"));
464 return -1;
467 h->ctx = ctx;
469 ret = db_ctdb_transaction_fetch_start(h);
470 if (ret != 0) {
471 talloc_free(h);
472 return -1;
475 talloc_set_destructor(h, db_ctdb_transaction_destructor);
477 ctx->transaction = h;
479 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
481 return 0;
487 fetch a record inside a transaction
489 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
490 TALLOC_CTX *mem_ctx,
491 TDB_DATA key, TDB_DATA *data)
493 struct db_ctdb_transaction_handle *h = db->transaction;
494 NTSTATUS status;
496 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
498 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
499 *data = tdb_null;
500 } else if (!NT_STATUS_IS_OK(status)) {
501 return -1;
504 if (!h->in_replay) {
505 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
506 if (h->m_all == NULL) {
507 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
508 data->dsize = 0;
509 talloc_free(data->dptr);
510 return -1;
514 return 0;
518 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
519 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
521 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
522 TALLOC_CTX *mem_ctx,
523 TDB_DATA key)
525 struct db_record *result;
526 TDB_DATA ctdb_data;
528 if (!(result = talloc(mem_ctx, struct db_record))) {
529 DEBUG(0, ("talloc failed\n"));
530 return NULL;
533 result->private_data = ctx->transaction;
535 result->key.dsize = key.dsize;
536 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
537 if (result->key.dptr == NULL) {
538 DEBUG(0, ("talloc failed\n"));
539 TALLOC_FREE(result);
540 return NULL;
543 result->store = db_ctdb_store_transaction;
544 result->delete_rec = db_ctdb_delete_transaction;
546 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
547 if (ctdb_data.dptr == NULL) {
548 /* create the record */
549 result->value = tdb_null;
550 return result;
553 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
554 result->value.dptr = NULL;
556 if ((result->value.dsize != 0)
557 && !(result->value.dptr = (uint8 *)talloc_memdup(
558 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
559 result->value.dsize))) {
560 DEBUG(0, ("talloc failed\n"));
561 TALLOC_FREE(result);
564 SAFE_FREE(ctdb_data.dptr);
566 return result;
569 static int db_ctdb_record_destructor(struct db_record **recp)
571 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
572 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
573 rec->private_data, struct db_ctdb_transaction_handle);
574 int ret = h->ctx->db->transaction_commit(h->ctx->db);
575 if (ret != 0) {
576 DEBUG(0,(__location__ " transaction_commit failed\n"));
578 return 0;
582 auto-create a transaction for persistent databases
584 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
585 TALLOC_CTX *mem_ctx,
586 TDB_DATA key)
588 int res;
589 struct db_record *rec, **recp;
591 res = db_ctdb_transaction_start(ctx->db);
592 if (res == -1) {
593 return NULL;
596 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
597 if (rec == NULL) {
598 ctx->db->transaction_cancel(ctx->db);
599 return NULL;
602 /* destroy this transaction when we release the lock */
603 recp = talloc(rec, struct db_record *);
604 if (recp == NULL) {
605 ctx->db->transaction_cancel(ctx->db);
606 talloc_free(rec);
607 return NULL;
609 *recp = rec;
610 talloc_set_destructor(recp, db_ctdb_record_destructor);
611 return rec;
616 stores a record inside a transaction
618 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
619 TDB_DATA key, TDB_DATA data)
621 TALLOC_CTX *tmp_ctx = talloc_new(h);
622 int ret;
623 TDB_DATA rec;
624 struct ctdb_ltdb_header header;
625 NTSTATUS status;
627 /* we need the header so we can update the RSN */
628 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
629 if (rec.dptr == NULL) {
630 /* the record doesn't exist - create one with us as dmaster.
631 This is only safe because we are in a transaction and this
632 is a persistent database */
633 ZERO_STRUCT(header);
634 } else {
635 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
636 rec.dsize -= sizeof(struct ctdb_ltdb_header);
637 /* a special case, we are writing the same data that is there now */
638 if (data.dsize == rec.dsize &&
639 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
640 SAFE_FREE(rec.dptr);
641 talloc_free(tmp_ctx);
642 return 0;
644 SAFE_FREE(rec.dptr);
647 header.dmaster = get_my_vnn();
648 header.rsn++;
650 if (!h->in_replay) {
651 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
652 if (h->m_all == NULL) {
653 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
654 talloc_free(tmp_ctx);
655 return -1;
659 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
660 if (h->m_write == NULL) {
661 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
662 talloc_free(tmp_ctx);
663 return -1;
666 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
667 if (NT_STATUS_IS_OK(status)) {
668 ret = 0;
669 } else {
670 ret = -1;
673 talloc_free(tmp_ctx);
675 return ret;
680 a record store inside a transaction
682 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
684 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
685 rec->private_data, struct db_ctdb_transaction_handle);
686 int ret;
688 ret = db_ctdb_transaction_store(h, rec->key, data);
689 if (ret != 0) {
690 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
692 return NT_STATUS_OK;
696 a record delete inside a transaction
698 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
700 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
701 rec->private_data, struct db_ctdb_transaction_handle);
702 int ret;
704 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
705 if (ret != 0) {
706 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
708 return NT_STATUS_OK;
713 replay a transaction
715 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
717 int ret, i;
718 struct ctdb_rec_data *rec = NULL;
720 h->in_replay = true;
721 talloc_free(h->m_write);
722 h->m_write = NULL;
724 ret = db_ctdb_transaction_fetch_start(h);
725 if (ret != 0) {
726 return ret;
729 for (i=0;i<h->m_all->count;i++) {
730 TDB_DATA key, data;
732 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
733 if (rec == NULL) {
734 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
735 goto failed;
738 if (rec->reqid == 0) {
739 /* its a store */
740 if (db_ctdb_transaction_store(h, key, data) != 0) {
741 goto failed;
743 } else {
744 TDB_DATA data2;
745 TALLOC_CTX *tmp_ctx = talloc_new(h);
747 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
748 talloc_free(tmp_ctx);
749 goto failed;
751 if (data2.dsize != data.dsize ||
752 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
753 /* the record has changed on us - we have to give up */
754 talloc_free(tmp_ctx);
755 goto failed;
757 talloc_free(tmp_ctx);
761 return 0;
763 failed:
764 tdb_transaction_cancel(h->ctx->wtdb->tdb);
765 return -1;
770 commit a transaction
772 static int db_ctdb_transaction_commit(struct db_context *db)
774 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
775 struct db_ctdb_ctx);
776 NTSTATUS rets;
777 int ret;
778 int status;
779 int retries = 0;
780 struct db_ctdb_transaction_handle *h = ctx->transaction;
781 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
783 if (h == NULL) {
784 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
785 return -1;
788 if (h->nested_cancel) {
789 db->transaction_cancel(db);
790 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
791 return -1;
794 if (h->nesting != 0) {
795 h->nesting--;
796 return 0;
799 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
801 talloc_set_destructor(h, NULL);
803 /* our commit strategy is quite complex.
805 - we first try to commit the changes to all other nodes
807 - if that works, then we commit locally and we are done
809 - if a commit on another node fails, then we need to cancel
810 the transaction, then restart the transaction (thus
811 opening a window of time for a pending recovery to
812 complete), then replay the transaction, checking all the
813 reads and writes (checking that reads give the same data,
814 and writes succeed). Then we retry the transaction to the
815 other nodes
818 again:
819 if (h->m_write == NULL) {
820 /* no changes were made, potentially after a retry */
821 tdb_transaction_cancel(h->ctx->wtdb->tdb);
822 talloc_free(h);
823 ctx->transaction = NULL;
824 return 0;
827 /* tell ctdbd to commit to the other nodes */
828 rets = ctdbd_control_local(messaging_ctdbd_connection(),
829 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
830 h->ctx->db_id, 0,
831 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
832 if (!NT_STATUS_IS_OK(rets) || status != 0) {
833 tdb_transaction_cancel(h->ctx->wtdb->tdb);
834 sleep(1);
836 if (!NT_STATUS_IS_OK(rets)) {
837 failure_control = CTDB_CONTROL_TRANS2_ERROR;
838 } else {
839 /* work out what error code we will give if we
840 have to fail the operation */
841 switch ((enum ctdb_trans2_commit_error)status) {
842 case CTDB_TRANS2_COMMIT_SUCCESS:
843 case CTDB_TRANS2_COMMIT_SOMEFAIL:
844 case CTDB_TRANS2_COMMIT_TIMEOUT:
845 failure_control = CTDB_CONTROL_TRANS2_ERROR;
846 break;
847 case CTDB_TRANS2_COMMIT_ALLFAIL:
848 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
849 break;
853 if (++retries == 100) {
854 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
855 h->ctx->db_id, retries, (unsigned)failure_control));
856 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
857 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
858 tdb_null, NULL, NULL, NULL);
859 h->ctx->transaction = NULL;
860 talloc_free(h);
861 ctx->transaction = NULL;
862 return -1;
865 if (ctdb_replay_transaction(h) != 0) {
866 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
867 (unsigned)failure_control));
868 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
869 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
870 tdb_null, NULL, NULL, NULL);
871 h->ctx->transaction = NULL;
872 talloc_free(h);
873 ctx->transaction = NULL;
874 return -1;
876 goto again;
877 } else {
878 failure_control = CTDB_CONTROL_TRANS2_ERROR;
881 /* do the real commit locally */
882 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
883 if (ret != 0) {
884 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
885 (unsigned)failure_control));
886 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
887 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
888 h->ctx->transaction = NULL;
889 talloc_free(h);
890 return ret;
893 /* tell ctdbd that we are finished with our local commit */
894 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
895 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
896 tdb_null, NULL, NULL, NULL);
897 h->ctx->transaction = NULL;
898 talloc_free(h);
899 return 0;
904 cancel a transaction
906 static int db_ctdb_transaction_cancel(struct db_context *db)
908 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
909 struct db_ctdb_ctx);
910 struct db_ctdb_transaction_handle *h = ctx->transaction;
912 if (h == NULL) {
913 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
914 return -1;
917 if (h->nesting != 0) {
918 h->nesting--;
919 h->nested_cancel = true;
920 return 0;
923 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
925 ctx->transaction = NULL;
926 talloc_free(h);
927 return 0;
931 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
933 struct db_ctdb_rec *crec = talloc_get_type_abort(
934 rec->private_data, struct db_ctdb_rec);
936 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
941 static NTSTATUS db_ctdb_delete(struct db_record *rec)
943 TDB_DATA data;
946 * We have to store the header with empty data. TODO: Fix the
947 * tdb-level cleanup
950 ZERO_STRUCT(data);
952 return db_ctdb_store(rec, data, 0);
956 static int db_ctdb_record_destr(struct db_record* data)
958 struct db_ctdb_rec *crec = talloc_get_type_abort(
959 data->private_data, struct db_ctdb_rec);
961 DEBUG(10, (DEBUGLEVEL > 10
962 ? "Unlocking db %u key %s\n"
963 : "Unlocking db %u key %.20s\n",
964 (int)crec->ctdb_ctx->db_id,
965 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
966 data->key.dsize)));
968 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
969 DEBUG(0, ("tdb_chainunlock failed\n"));
970 return -1;
973 return 0;
976 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
977 TALLOC_CTX *mem_ctx,
978 TDB_DATA key,
979 bool persistent)
981 struct db_record *result;
982 struct db_ctdb_rec *crec;
983 NTSTATUS status;
984 TDB_DATA ctdb_data;
985 int migrate_attempts = 0;
987 if (!(result = talloc(mem_ctx, struct db_record))) {
988 DEBUG(0, ("talloc failed\n"));
989 return NULL;
992 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
993 DEBUG(0, ("talloc failed\n"));
994 TALLOC_FREE(result);
995 return NULL;
998 result->private_data = (void *)crec;
999 crec->ctdb_ctx = ctx;
1001 result->key.dsize = key.dsize;
1002 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
1003 if (result->key.dptr == NULL) {
1004 DEBUG(0, ("talloc failed\n"));
1005 TALLOC_FREE(result);
1006 return NULL;
1010 * Do a blocking lock on the record
1012 again:
1014 if (DEBUGLEVEL >= 10) {
1015 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1016 DEBUG(10, (DEBUGLEVEL > 10
1017 ? "Locking db %u key %s\n"
1018 : "Locking db %u key %.20s\n",
1019 (int)crec->ctdb_ctx->db_id, keystr));
1020 TALLOC_FREE(keystr);
1023 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
1024 DEBUG(3, ("tdb_chainlock failed\n"));
1025 TALLOC_FREE(result);
1026 return NULL;
1029 result->store = db_ctdb_store;
1030 result->delete_rec = db_ctdb_delete;
1031 talloc_set_destructor(result, db_ctdb_record_destr);
1033 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1036 * See if we have a valid record and we are the dmaster. If so, we can
1037 * take the shortcut and just return it.
1040 if ((ctdb_data.dptr == NULL) ||
1041 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1042 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1043 #if 0
1044 || (random() % 2 != 0)
1045 #endif
1047 SAFE_FREE(ctdb_data.dptr);
1048 tdb_chainunlock(ctx->wtdb->tdb, key);
1049 talloc_set_destructor(result, NULL);
1051 migrate_attempts += 1;
1053 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1054 ctdb_data.dptr, ctdb_data.dptr ?
1055 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1056 get_my_vnn()));
1058 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1059 if (!NT_STATUS_IS_OK(status)) {
1060 DEBUG(5, ("ctdb_migrate failed: %s\n",
1061 nt_errstr(status)));
1062 TALLOC_FREE(result);
1063 return NULL;
1065 /* now its migrated, try again */
1066 goto again;
1069 if (migrate_attempts > 10) {
1070 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1071 migrate_attempts));
1074 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1076 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1077 result->value.dptr = NULL;
1079 if ((result->value.dsize != 0)
1080 && !(result->value.dptr = (uint8 *)talloc_memdup(
1081 result, ctdb_data.dptr + sizeof(crec->header),
1082 result->value.dsize))) {
1083 DEBUG(0, ("talloc failed\n"));
1084 TALLOC_FREE(result);
1087 SAFE_FREE(ctdb_data.dptr);
1089 return result;
1092 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1093 TALLOC_CTX *mem_ctx,
1094 TDB_DATA key)
1096 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1097 struct db_ctdb_ctx);
1099 if (ctx->transaction != NULL) {
1100 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1103 if (db->persistent) {
1104 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1107 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1111 fetch (unlocked, no migration) operation on ctdb
1113 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1114 TDB_DATA key, TDB_DATA *data)
1116 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1117 struct db_ctdb_ctx);
1118 NTSTATUS status;
1119 TDB_DATA ctdb_data;
1121 if (ctx->transaction) {
1122 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1125 /* try a direct fetch */
1126 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1129 * See if we have a valid record and we are the dmaster. If so, we can
1130 * take the shortcut and just return it.
1131 * we bypass the dmaster check for persistent databases
1133 if ((ctdb_data.dptr != NULL) &&
1134 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1135 (db->persistent ||
1136 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1137 /* we are the dmaster - avoid the ctdb protocol op */
1139 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1140 if (data->dsize == 0) {
1141 SAFE_FREE(ctdb_data.dptr);
1142 data->dptr = NULL;
1143 return 0;
1146 data->dptr = (uint8 *)talloc_memdup(
1147 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1148 data->dsize);
1150 SAFE_FREE(ctdb_data.dptr);
1152 if (data->dptr == NULL) {
1153 return -1;
1155 return 0;
1158 SAFE_FREE(ctdb_data.dptr);
1160 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1161 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1162 if (!NT_STATUS_IS_OK(status)) {
1163 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1164 return -1;
1167 return 0;
1170 struct traverse_state {
1171 struct db_context *db;
1172 int (*fn)(struct db_record *rec, void *private_data);
1173 void *private_data;
1176 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1178 struct traverse_state *state = (struct traverse_state *)private_data;
1179 struct db_record *rec;
1180 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1181 /* we have to give them a locked record to prevent races */
1182 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1183 if (rec && rec->value.dsize > 0) {
1184 state->fn(rec, state->private_data);
1186 talloc_free(tmp_ctx);
1189 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1190 void *private_data)
1192 struct traverse_state *state = (struct traverse_state *)private_data;
1193 struct db_record *rec;
1194 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1195 int ret = 0;
1196 /* we have to give them a locked record to prevent races */
1197 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1198 if (rec && rec->value.dsize > 0) {
1199 ret = state->fn(rec, state->private_data);
1201 talloc_free(tmp_ctx);
1202 return ret;
1205 static int db_ctdb_traverse(struct db_context *db,
1206 int (*fn)(struct db_record *rec,
1207 void *private_data),
1208 void *private_data)
1210 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1211 struct db_ctdb_ctx);
1212 struct traverse_state state;
1214 state.db = db;
1215 state.fn = fn;
1216 state.private_data = private_data;
1218 if (db->persistent) {
1219 /* for persistent databases we don't need to do a ctdb traverse,
1220 we can do a faster local traverse */
1221 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1225 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1226 return 0;
1229 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1231 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1234 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1236 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1239 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1241 struct traverse_state *state = (struct traverse_state *)private_data;
1242 struct db_record rec;
1243 rec.key = key;
1244 rec.value = data;
1245 rec.store = db_ctdb_store_deny;
1246 rec.delete_rec = db_ctdb_delete_deny;
1247 rec.private_data = state->db;
1248 state->fn(&rec, state->private_data);
1251 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1252 void *private_data)
1254 struct traverse_state *state = (struct traverse_state *)private_data;
1255 struct db_record rec;
1256 rec.key = kbuf;
1257 rec.value = dbuf;
1258 rec.store = db_ctdb_store_deny;
1259 rec.delete_rec = db_ctdb_delete_deny;
1260 rec.private_data = state->db;
1262 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1263 /* a deleted record */
1264 return 0;
1266 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1267 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1269 return state->fn(&rec, state->private_data);
1272 static int db_ctdb_traverse_read(struct db_context *db,
1273 int (*fn)(struct db_record *rec,
1274 void *private_data),
1275 void *private_data)
1277 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1278 struct db_ctdb_ctx);
1279 struct traverse_state state;
1281 state.db = db;
1282 state.fn = fn;
1283 state.private_data = private_data;
1285 if (db->persistent) {
1286 /* for persistent databases we don't need to do a ctdb traverse,
1287 we can do a faster local traverse */
1288 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1291 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1292 return 0;
1295 static int db_ctdb_get_seqnum(struct db_context *db)
1297 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1298 struct db_ctdb_ctx);
1299 return tdb_get_seqnum(ctx->wtdb->tdb);
1302 static int db_ctdb_get_flags(struct db_context *db)
1304 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1305 struct db_ctdb_ctx);
1306 return tdb_get_flags(ctx->wtdb->tdb);
1309 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1310 const char *name,
1311 int hash_size, int tdb_flags,
1312 int open_flags, mode_t mode)
1314 struct db_context *result;
1315 struct db_ctdb_ctx *db_ctdb;
1316 char *db_path;
1318 if (!lp_clustering()) {
1319 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1320 return NULL;
1323 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1324 DEBUG(0, ("talloc failed\n"));
1325 TALLOC_FREE(result);
1326 return NULL;
1329 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1330 DEBUG(0, ("talloc failed\n"));
1331 TALLOC_FREE(result);
1332 return NULL;
1335 db_ctdb->transaction = NULL;
1336 db_ctdb->db = result;
1338 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1339 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1340 TALLOC_FREE(result);
1341 return NULL;
1344 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1346 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1348 /* only pass through specific flags */
1349 tdb_flags &= TDB_SEQNUM;
1351 /* honor permissions if user has specified O_CREAT */
1352 if (open_flags & O_CREAT) {
1353 chmod(db_path, mode);
1356 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1357 if (db_ctdb->wtdb == NULL) {
1358 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1359 TALLOC_FREE(result);
1360 return NULL;
1362 talloc_free(db_path);
1364 result->private_data = (void *)db_ctdb;
1365 result->fetch_locked = db_ctdb_fetch_locked;
1366 result->fetch = db_ctdb_fetch;
1367 result->traverse = db_ctdb_traverse;
1368 result->traverse_read = db_ctdb_traverse_read;
1369 result->get_seqnum = db_ctdb_get_seqnum;
1370 result->get_flags = db_ctdb_get_flags;
1371 result->transaction_start = db_ctdb_transaction_start;
1372 result->transaction_commit = db_ctdb_transaction_commit;
1373 result->transaction_cancel = db_ctdb_transaction_cancel;
1375 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1376 name, db_ctdb->db_id));
1378 return result;
1380 #endif