s4:upgradeschema.py Cleanup
[Samba/ekacnet.git] / source3 / lib / dbwrap_ctdb.c
blob8563990a84fa15979c6422e1b35b8ea00b5d5368
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
40 struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
47 struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
74 return status;
78 /**
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83 TDB_DATA key,
84 struct ctdb_ltdb_header *header,
85 TALLOC_CTX *mem_ctx,
86 TDB_DATA *data)
88 TDB_DATA rec;
89 NTSTATUS status;
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
94 if (data) {
95 ZERO_STRUCTP(data);
97 if (header) {
98 header->dmaster = (uint32_t)-1;
99 header->rsn = 0;
101 goto done;
104 if (header) {
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
108 if (data) {
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
111 data->dptr = NULL;
112 } else {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114 rec.dptr
115 + sizeof(struct ctdb_ltdb_header),
116 data->dsize);
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
119 goto done;
124 status = NT_STATUS_OK;
126 done:
127 SAFE_FREE(rec.dptr);
128 return status;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136 TDB_DATA key,
137 struct ctdb_ltdb_header *header,
138 TDB_DATA data)
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
141 TDB_DATA rec;
142 int ret;
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
157 talloc_free(tmp_ctx);
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
168 of the record
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
171 TDB_DATA key,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA data)
175 size_t length;
176 struct ctdb_rec_data *d;
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181 if (d == NULL) {
182 return NULL;
184 d->length = length;
185 d->reqid = reqid;
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
188 if (header) {
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192 } else {
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
196 return d;
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
203 uint64_t db_id,
204 uint32_t reqid,
205 TDB_DATA key,
206 struct ctdb_ltdb_header *header,
207 TDB_DATA data)
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214 if (r == NULL) {
215 talloc_free(m);
216 return NULL;
219 if (m == NULL) {
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222 if (m == NULL) {
223 goto done;
225 m->db_id = db_id;
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
233 if (m2 == NULL) {
234 talloc_free(m);
235 goto done;
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
240 m2->count++;
242 done:
243 talloc_free(r);
244 return m2;
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
250 TDB_DATA data;
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
253 return data;
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263 uint32_t *reqid,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
267 if (r == NULL) {
268 r = (struct ctdb_rec_data *)&m->data[0];
269 } else {
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
273 if (reqid != NULL) {
274 *reqid = r->reqid;
277 if (key != NULL) {
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
281 if (data != NULL) {
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
292 return NULL;
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
297 return r;
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
303 int32_t status;
304 NTSTATUS ret;
305 TDB_DATA indata;
307 indata.dptr = (uint8_t *)&db_id;
308 indata.dsize = sizeof(db_id);
310 ret = ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312 indata, NULL, NULL, &status);
314 if (!NT_STATUS_IS_OK(ret)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316 return -1;
319 return status;
324 * CTDB transaction destructor
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
328 tdb_transaction_cancel(h->ctx->wtdb->tdb);
329 return 0;
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
339 struct db_record *rh;
340 struct db_ctdb_rec *crec;
341 TDB_DATA key;
342 TALLOC_CTX *tmp_ctx;
343 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344 int ret;
345 struct db_ctdb_ctx *ctx = h->ctx;
346 TDB_DATA data;
347 pid_t pid;
348 NTSTATUS status;
349 struct ctdb_ltdb_header header;
350 int32_t transaction_status;
352 key.dptr = (uint8_t *)discard_const(keyname);
353 key.dsize = strlen(keyname);
355 again:
356 tmp_ctx = talloc_new(h);
358 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
359 if (rh == NULL) {
360 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
361 talloc_free(tmp_ctx);
362 return -1;
364 crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
366 transaction_status = db_ctdb_transaction_active(ctx->db_id);
367 if (transaction_status == 1) {
368 unsigned long int usec = (1000 + random()) % 100000;
369 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370 "Re-trying after %lu microseconds...",
371 ctx->db_id, usec));
372 talloc_free(tmp_ctx);
373 usleep(usec);
374 goto again;
378 * store the pid in the database:
379 * it is not enought that the node is dmaster...
381 pid = getpid();
382 data.dptr = (unsigned char *)&pid;
383 data.dsize = sizeof(pid_t);
384 status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
385 if (!NT_STATUS_IS_OK(status)) {
386 DEBUG(0, (__location__ " Failed to store pid in transaction "
387 "record: %s\n", nt_errstr(status)));
388 talloc_free(tmp_ctx);
389 return -1;
392 talloc_free(rh);
394 ret = tdb_transaction_start(ctx->wtdb->tdb);
395 if (ret != 0) {
396 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
397 talloc_free(tmp_ctx);
398 return -1;
401 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
402 if (!NT_STATUS_IS_OK(status)) {
403 DEBUG(0, (__location__ " failed to refetch transaction lock "
404 "record inside transaction: %s - retrying\n",
405 nt_errstr(status)));
406 tdb_transaction_cancel(ctx->wtdb->tdb);
407 talloc_free(tmp_ctx);
408 goto again;
411 if (header.dmaster != get_my_vnn()) {
412 DEBUG(3, (__location__ " refetch transaction lock record : "
413 "we are not dmaster any more "
414 "(dmaster[%u] != my_vnn[%u]) - retrying\n",
415 header.dmaster, get_my_vnn()));
416 tdb_transaction_cancel(ctx->wtdb->tdb);
417 talloc_free(tmp_ctx);
418 goto again;
421 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
422 DEBUG(3, (__location__ " refetch transaction lock record: "
423 "another local process has started a transaction "
424 "(stored pid [%u] != my pid [%u]) - retrying\n",
425 *(pid_t *)(data.dptr), pid));
426 tdb_transaction_cancel(ctx->wtdb->tdb);
427 talloc_free(tmp_ctx);
428 goto again;
431 talloc_free(tmp_ctx);
433 return 0;
438 * CTDB dbwrap API: transaction_start function
439 * starts a transaction on a persistent database
441 static int db_ctdb_transaction_start(struct db_context *db)
443 struct db_ctdb_transaction_handle *h;
444 int ret;
445 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
446 struct db_ctdb_ctx);
448 if (!db->persistent) {
449 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
450 ctx->db_id));
451 return -1;
454 if (ctx->transaction) {
455 ctx->transaction->nesting++;
456 return 0;
459 h = talloc_zero(db, struct db_ctdb_transaction_handle);
460 if (h == NULL) {
461 DEBUG(0,(__location__ " oom for transaction handle\n"));
462 return -1;
465 h->ctx = ctx;
467 ret = db_ctdb_transaction_fetch_start(h);
468 if (ret != 0) {
469 talloc_free(h);
470 return -1;
473 talloc_set_destructor(h, db_ctdb_transaction_destructor);
475 ctx->transaction = h;
477 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
479 return 0;
485 fetch a record inside a transaction
487 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
488 TALLOC_CTX *mem_ctx,
489 TDB_DATA key, TDB_DATA *data)
491 struct db_ctdb_transaction_handle *h = db->transaction;
492 NTSTATUS status;
494 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
496 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
497 *data = tdb_null;
498 } else if (!NT_STATUS_IS_OK(status)) {
499 return -1;
502 if (!h->in_replay) {
503 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
504 if (h->m_all == NULL) {
505 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
506 data->dsize = 0;
507 talloc_free(data->dptr);
508 return -1;
512 return 0;
516 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
517 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
519 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
520 TALLOC_CTX *mem_ctx,
521 TDB_DATA key)
523 struct db_record *result;
524 TDB_DATA ctdb_data;
526 if (!(result = talloc(mem_ctx, struct db_record))) {
527 DEBUG(0, ("talloc failed\n"));
528 return NULL;
531 result->private_data = ctx->transaction;
533 result->key.dsize = key.dsize;
534 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
535 if (result->key.dptr == NULL) {
536 DEBUG(0, ("talloc failed\n"));
537 TALLOC_FREE(result);
538 return NULL;
541 result->store = db_ctdb_store_transaction;
542 result->delete_rec = db_ctdb_delete_transaction;
544 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
545 if (ctdb_data.dptr == NULL) {
546 /* create the record */
547 result->value = tdb_null;
548 return result;
551 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
552 result->value.dptr = NULL;
554 if ((result->value.dsize != 0)
555 && !(result->value.dptr = (uint8 *)talloc_memdup(
556 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
557 result->value.dsize))) {
558 DEBUG(0, ("talloc failed\n"));
559 TALLOC_FREE(result);
562 SAFE_FREE(ctdb_data.dptr);
564 return result;
567 static int db_ctdb_record_destructor(struct db_record **recp)
569 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
570 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
571 rec->private_data, struct db_ctdb_transaction_handle);
572 int ret = h->ctx->db->transaction_commit(h->ctx->db);
573 if (ret != 0) {
574 DEBUG(0,(__location__ " transaction_commit failed\n"));
576 return 0;
580 auto-create a transaction for persistent databases
582 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
583 TALLOC_CTX *mem_ctx,
584 TDB_DATA key)
586 int res;
587 struct db_record *rec, **recp;
589 res = db_ctdb_transaction_start(ctx->db);
590 if (res == -1) {
591 return NULL;
594 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
595 if (rec == NULL) {
596 ctx->db->transaction_cancel(ctx->db);
597 return NULL;
600 /* destroy this transaction when we release the lock */
601 recp = talloc(rec, struct db_record *);
602 if (recp == NULL) {
603 ctx->db->transaction_cancel(ctx->db);
604 talloc_free(rec);
605 return NULL;
607 *recp = rec;
608 talloc_set_destructor(recp, db_ctdb_record_destructor);
609 return rec;
614 stores a record inside a transaction
616 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
617 TDB_DATA key, TDB_DATA data)
619 TALLOC_CTX *tmp_ctx = talloc_new(h);
620 int ret;
621 TDB_DATA rec;
622 struct ctdb_ltdb_header header;
623 NTSTATUS status;
625 /* we need the header so we can update the RSN */
626 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
627 if (rec.dptr == NULL) {
628 /* the record doesn't exist - create one with us as dmaster.
629 This is only safe because we are in a transaction and this
630 is a persistent database */
631 ZERO_STRUCT(header);
632 } else {
633 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
634 rec.dsize -= sizeof(struct ctdb_ltdb_header);
635 /* a special case, we are writing the same data that is there now */
636 if (data.dsize == rec.dsize &&
637 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
638 SAFE_FREE(rec.dptr);
639 talloc_free(tmp_ctx);
640 return 0;
642 SAFE_FREE(rec.dptr);
645 header.dmaster = get_my_vnn();
646 header.rsn++;
648 if (!h->in_replay) {
649 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
650 if (h->m_all == NULL) {
651 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
652 talloc_free(tmp_ctx);
653 return -1;
657 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
658 if (h->m_write == NULL) {
659 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
660 talloc_free(tmp_ctx);
661 return -1;
664 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
665 if (NT_STATUS_IS_OK(status)) {
666 ret = 0;
667 } else {
668 ret = -1;
671 talloc_free(tmp_ctx);
673 return ret;
678 a record store inside a transaction
680 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
682 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
683 rec->private_data, struct db_ctdb_transaction_handle);
684 int ret;
686 ret = db_ctdb_transaction_store(h, rec->key, data);
687 if (ret != 0) {
688 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
690 return NT_STATUS_OK;
694 a record delete inside a transaction
696 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
698 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
699 rec->private_data, struct db_ctdb_transaction_handle);
700 int ret;
702 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
703 if (ret != 0) {
704 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
706 return NT_STATUS_OK;
711 replay a transaction
713 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
715 int ret, i;
716 struct ctdb_rec_data *rec = NULL;
718 h->in_replay = true;
719 talloc_free(h->m_write);
720 h->m_write = NULL;
722 ret = db_ctdb_transaction_fetch_start(h);
723 if (ret != 0) {
724 return ret;
727 for (i=0;i<h->m_all->count;i++) {
728 TDB_DATA key, data;
730 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
731 if (rec == NULL) {
732 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
733 goto failed;
736 if (rec->reqid == 0) {
737 /* its a store */
738 if (db_ctdb_transaction_store(h, key, data) != 0) {
739 goto failed;
741 } else {
742 TDB_DATA data2;
743 TALLOC_CTX *tmp_ctx = talloc_new(h);
745 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
746 talloc_free(tmp_ctx);
747 goto failed;
749 if (data2.dsize != data.dsize ||
750 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
751 /* the record has changed on us - we have to give up */
752 talloc_free(tmp_ctx);
753 goto failed;
755 talloc_free(tmp_ctx);
759 return 0;
761 failed:
762 tdb_transaction_cancel(h->ctx->wtdb->tdb);
763 return -1;
768 commit a transaction
770 static int db_ctdb_transaction_commit(struct db_context *db)
772 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
773 struct db_ctdb_ctx);
774 NTSTATUS rets;
775 int ret;
776 int status;
777 int retries = 0;
778 struct db_ctdb_transaction_handle *h = ctx->transaction;
779 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
781 if (h == NULL) {
782 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
783 return -1;
786 if (h->nested_cancel) {
787 db->transaction_cancel(db);
788 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
789 return -1;
792 if (h->nesting != 0) {
793 h->nesting--;
794 return 0;
797 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
799 talloc_set_destructor(h, NULL);
801 /* our commit strategy is quite complex.
803 - we first try to commit the changes to all other nodes
805 - if that works, then we commit locally and we are done
807 - if a commit on another node fails, then we need to cancel
808 the transaction, then restart the transaction (thus
809 opening a window of time for a pending recovery to
810 complete), then replay the transaction, checking all the
811 reads and writes (checking that reads give the same data,
812 and writes succeed). Then we retry the transaction to the
813 other nodes
816 again:
817 if (h->m_write == NULL) {
818 /* no changes were made, potentially after a retry */
819 tdb_transaction_cancel(h->ctx->wtdb->tdb);
820 talloc_free(h);
821 ctx->transaction = NULL;
822 return 0;
825 /* tell ctdbd to commit to the other nodes */
826 rets = ctdbd_control_local(messaging_ctdbd_connection(),
827 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
828 h->ctx->db_id, 0,
829 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
830 if (!NT_STATUS_IS_OK(rets) || status != 0) {
831 tdb_transaction_cancel(h->ctx->wtdb->tdb);
832 sleep(1);
834 if (!NT_STATUS_IS_OK(rets)) {
835 failure_control = CTDB_CONTROL_TRANS2_ERROR;
836 } else {
837 /* work out what error code we will give if we
838 have to fail the operation */
839 switch ((enum ctdb_trans2_commit_error)status) {
840 case CTDB_TRANS2_COMMIT_SUCCESS:
841 case CTDB_TRANS2_COMMIT_SOMEFAIL:
842 case CTDB_TRANS2_COMMIT_TIMEOUT:
843 failure_control = CTDB_CONTROL_TRANS2_ERROR;
844 break;
845 case CTDB_TRANS2_COMMIT_ALLFAIL:
846 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
847 break;
851 if (++retries == 5) {
852 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
853 h->ctx->db_id, retries, (unsigned)failure_control));
854 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
855 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
856 tdb_null, NULL, NULL, NULL);
857 h->ctx->transaction = NULL;
858 talloc_free(h);
859 ctx->transaction = NULL;
860 return -1;
863 if (ctdb_replay_transaction(h) != 0) {
864 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
865 (unsigned)failure_control));
866 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
867 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
868 tdb_null, NULL, NULL, NULL);
869 h->ctx->transaction = NULL;
870 talloc_free(h);
871 ctx->transaction = NULL;
872 return -1;
874 goto again;
875 } else {
876 failure_control = CTDB_CONTROL_TRANS2_ERROR;
879 /* do the real commit locally */
880 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
881 if (ret != 0) {
882 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
883 (unsigned)failure_control));
884 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
885 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
886 h->ctx->transaction = NULL;
887 talloc_free(h);
888 return ret;
891 /* tell ctdbd that we are finished with our local commit */
892 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
893 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
894 tdb_null, NULL, NULL, NULL);
895 h->ctx->transaction = NULL;
896 talloc_free(h);
897 return 0;
902 cancel a transaction
904 static int db_ctdb_transaction_cancel(struct db_context *db)
906 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
907 struct db_ctdb_ctx);
908 struct db_ctdb_transaction_handle *h = ctx->transaction;
910 if (h == NULL) {
911 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
912 return -1;
915 if (h->nesting != 0) {
916 h->nesting--;
917 h->nested_cancel = true;
918 return 0;
921 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
923 ctx->transaction = NULL;
924 talloc_free(h);
925 return 0;
929 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
931 struct db_ctdb_rec *crec = talloc_get_type_abort(
932 rec->private_data, struct db_ctdb_rec);
934 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
939 static NTSTATUS db_ctdb_delete(struct db_record *rec)
941 TDB_DATA data;
944 * We have to store the header with empty data. TODO: Fix the
945 * tdb-level cleanup
948 ZERO_STRUCT(data);
950 return db_ctdb_store(rec, data, 0);
954 static int db_ctdb_record_destr(struct db_record* data)
956 struct db_ctdb_rec *crec = talloc_get_type_abort(
957 data->private_data, struct db_ctdb_rec);
959 DEBUG(10, (DEBUGLEVEL > 10
960 ? "Unlocking db %u key %s\n"
961 : "Unlocking db %u key %.20s\n",
962 (int)crec->ctdb_ctx->db_id,
963 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
964 data->key.dsize)));
966 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
967 DEBUG(0, ("tdb_chainunlock failed\n"));
968 return -1;
971 return 0;
974 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
975 TALLOC_CTX *mem_ctx,
976 TDB_DATA key,
977 bool persistent)
979 struct db_record *result;
980 struct db_ctdb_rec *crec;
981 NTSTATUS status;
982 TDB_DATA ctdb_data;
983 int migrate_attempts = 0;
985 if (!(result = talloc(mem_ctx, struct db_record))) {
986 DEBUG(0, ("talloc failed\n"));
987 return NULL;
990 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
991 DEBUG(0, ("talloc failed\n"));
992 TALLOC_FREE(result);
993 return NULL;
996 result->private_data = (void *)crec;
997 crec->ctdb_ctx = ctx;
999 result->key.dsize = key.dsize;
1000 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
1001 if (result->key.dptr == NULL) {
1002 DEBUG(0, ("talloc failed\n"));
1003 TALLOC_FREE(result);
1004 return NULL;
1008 * Do a blocking lock on the record
1010 again:
1012 if (DEBUGLEVEL >= 10) {
1013 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1014 DEBUG(10, (DEBUGLEVEL > 10
1015 ? "Locking db %u key %s\n"
1016 : "Locking db %u key %.20s\n",
1017 (int)crec->ctdb_ctx->db_id, keystr));
1018 TALLOC_FREE(keystr);
1021 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
1022 DEBUG(3, ("tdb_chainlock failed\n"));
1023 TALLOC_FREE(result);
1024 return NULL;
1027 result->store = db_ctdb_store;
1028 result->delete_rec = db_ctdb_delete;
1029 talloc_set_destructor(result, db_ctdb_record_destr);
1031 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1034 * See if we have a valid record and we are the dmaster. If so, we can
1035 * take the shortcut and just return it.
1038 if ((ctdb_data.dptr == NULL) ||
1039 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1040 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1041 #if 0
1042 || (random() % 2 != 0)
1043 #endif
1045 SAFE_FREE(ctdb_data.dptr);
1046 tdb_chainunlock(ctx->wtdb->tdb, key);
1047 talloc_set_destructor(result, NULL);
1049 migrate_attempts += 1;
1051 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1052 ctdb_data.dptr, ctdb_data.dptr ?
1053 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1054 get_my_vnn()));
1056 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1057 if (!NT_STATUS_IS_OK(status)) {
1058 DEBUG(5, ("ctdb_migrate failed: %s\n",
1059 nt_errstr(status)));
1060 TALLOC_FREE(result);
1061 return NULL;
1063 /* now its migrated, try again */
1064 goto again;
1067 if (migrate_attempts > 10) {
1068 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1069 migrate_attempts));
1072 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1074 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1075 result->value.dptr = NULL;
1077 if ((result->value.dsize != 0)
1078 && !(result->value.dptr = (uint8 *)talloc_memdup(
1079 result, ctdb_data.dptr + sizeof(crec->header),
1080 result->value.dsize))) {
1081 DEBUG(0, ("talloc failed\n"));
1082 TALLOC_FREE(result);
1085 SAFE_FREE(ctdb_data.dptr);
1087 return result;
1090 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1091 TALLOC_CTX *mem_ctx,
1092 TDB_DATA key)
1094 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1095 struct db_ctdb_ctx);
1097 if (ctx->transaction != NULL) {
1098 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1101 if (db->persistent) {
1102 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1105 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1109 fetch (unlocked, no migration) operation on ctdb
1111 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1112 TDB_DATA key, TDB_DATA *data)
1114 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1115 struct db_ctdb_ctx);
1116 NTSTATUS status;
1117 TDB_DATA ctdb_data;
1119 if (ctx->transaction) {
1120 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1123 /* try a direct fetch */
1124 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1127 * See if we have a valid record and we are the dmaster. If so, we can
1128 * take the shortcut and just return it.
1129 * we bypass the dmaster check for persistent databases
1131 if ((ctdb_data.dptr != NULL) &&
1132 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1133 (db->persistent ||
1134 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1135 /* we are the dmaster - avoid the ctdb protocol op */
1137 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1138 if (data->dsize == 0) {
1139 SAFE_FREE(ctdb_data.dptr);
1140 data->dptr = NULL;
1141 return 0;
1144 data->dptr = (uint8 *)talloc_memdup(
1145 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1146 data->dsize);
1148 SAFE_FREE(ctdb_data.dptr);
1150 if (data->dptr == NULL) {
1151 return -1;
1153 return 0;
1156 SAFE_FREE(ctdb_data.dptr);
1158 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1159 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1160 if (!NT_STATUS_IS_OK(status)) {
1161 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1162 return -1;
1165 return 0;
1168 struct traverse_state {
1169 struct db_context *db;
1170 int (*fn)(struct db_record *rec, void *private_data);
1171 void *private_data;
1174 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1176 struct traverse_state *state = (struct traverse_state *)private_data;
1177 struct db_record *rec;
1178 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1179 /* we have to give them a locked record to prevent races */
1180 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1181 if (rec && rec->value.dsize > 0) {
1182 state->fn(rec, state->private_data);
1184 talloc_free(tmp_ctx);
1187 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1188 void *private_data)
1190 struct traverse_state *state = (struct traverse_state *)private_data;
1191 struct db_record *rec;
1192 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1193 int ret = 0;
1194 /* we have to give them a locked record to prevent races */
1195 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1196 if (rec && rec->value.dsize > 0) {
1197 ret = state->fn(rec, state->private_data);
1199 talloc_free(tmp_ctx);
1200 return ret;
1203 static int db_ctdb_traverse(struct db_context *db,
1204 int (*fn)(struct db_record *rec,
1205 void *private_data),
1206 void *private_data)
1208 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1209 struct db_ctdb_ctx);
1210 struct traverse_state state;
1212 state.db = db;
1213 state.fn = fn;
1214 state.private_data = private_data;
1216 if (db->persistent) {
1217 /* for persistent databases we don't need to do a ctdb traverse,
1218 we can do a faster local traverse */
1219 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1223 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1224 return 0;
1227 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1229 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1232 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1234 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1237 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1239 struct traverse_state *state = (struct traverse_state *)private_data;
1240 struct db_record rec;
1241 rec.key = key;
1242 rec.value = data;
1243 rec.store = db_ctdb_store_deny;
1244 rec.delete_rec = db_ctdb_delete_deny;
1245 rec.private_data = state->db;
1246 state->fn(&rec, state->private_data);
1249 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1250 void *private_data)
1252 struct traverse_state *state = (struct traverse_state *)private_data;
1253 struct db_record rec;
1254 rec.key = kbuf;
1255 rec.value = dbuf;
1256 rec.store = db_ctdb_store_deny;
1257 rec.delete_rec = db_ctdb_delete_deny;
1258 rec.private_data = state->db;
1260 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1261 /* a deleted record */
1262 return 0;
1264 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1265 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1267 return state->fn(&rec, state->private_data);
1270 static int db_ctdb_traverse_read(struct db_context *db,
1271 int (*fn)(struct db_record *rec,
1272 void *private_data),
1273 void *private_data)
1275 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1276 struct db_ctdb_ctx);
1277 struct traverse_state state;
1279 state.db = db;
1280 state.fn = fn;
1281 state.private_data = private_data;
1283 if (db->persistent) {
1284 /* for persistent databases we don't need to do a ctdb traverse,
1285 we can do a faster local traverse */
1286 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1289 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1290 return 0;
1293 static int db_ctdb_get_seqnum(struct db_context *db)
1295 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1296 struct db_ctdb_ctx);
1297 return tdb_get_seqnum(ctx->wtdb->tdb);
1300 static int db_ctdb_get_flags(struct db_context *db)
1302 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1303 struct db_ctdb_ctx);
1304 return tdb_get_flags(ctx->wtdb->tdb);
1307 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1308 const char *name,
1309 int hash_size, int tdb_flags,
1310 int open_flags, mode_t mode)
1312 struct db_context *result;
1313 struct db_ctdb_ctx *db_ctdb;
1314 char *db_path;
1316 if (!lp_clustering()) {
1317 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1318 return NULL;
1321 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1322 DEBUG(0, ("talloc failed\n"));
1323 TALLOC_FREE(result);
1324 return NULL;
1327 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1328 DEBUG(0, ("talloc failed\n"));
1329 TALLOC_FREE(result);
1330 return NULL;
1333 db_ctdb->transaction = NULL;
1334 db_ctdb->db = result;
1336 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1337 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1338 TALLOC_FREE(result);
1339 return NULL;
1342 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1344 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1346 /* only pass through specific flags */
1347 tdb_flags &= TDB_SEQNUM;
1349 /* honor permissions if user has specified O_CREAT */
1350 if (open_flags & O_CREAT) {
1351 chmod(db_path, mode);
1354 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1355 if (db_ctdb->wtdb == NULL) {
1356 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1357 TALLOC_FREE(result);
1358 return NULL;
1360 talloc_free(db_path);
1362 result->private_data = (void *)db_ctdb;
1363 result->fetch_locked = db_ctdb_fetch_locked;
1364 result->fetch = db_ctdb_fetch;
1365 result->traverse = db_ctdb_traverse;
1366 result->traverse_read = db_ctdb_traverse_read;
1367 result->get_seqnum = db_ctdb_get_seqnum;
1368 result->get_flags = db_ctdb_get_flags;
1369 result->transaction_start = db_ctdb_transaction_start;
1370 result->transaction_commit = db_ctdb_transaction_commit;
1371 result->transaction_cancel = db_ctdb_transaction_cancel;
1373 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1374 name, db_ctdb->db_id));
1376 return result;
1378 #endif