s3:dbwrap_ctdb: fix race condition with concurrent transactions on the same node.
[Samba/gebeck_regimport.git] / source3 / lib / dbwrap_ctdb.c
blobfef984c8c31c49d08d2741f3b1b1bc4825f97f38
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
40 struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
47 struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
74 return status;
78 /**
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83 TDB_DATA key,
84 struct ctdb_ltdb_header *header,
85 TALLOC_CTX *mem_ctx,
86 TDB_DATA *data)
88 TDB_DATA rec;
89 NTSTATUS status;
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
94 if (data) {
95 ZERO_STRUCTP(data);
97 if (header) {
98 header->dmaster = (uint32_t)-1;
99 header->rsn = 0;
101 goto done;
104 if (header) {
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
108 if (data) {
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
111 data->dptr = NULL;
112 } else {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114 rec.dptr
115 + sizeof(struct ctdb_ltdb_header),
116 data->dsize);
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
119 goto done;
124 status = NT_STATUS_OK;
126 done:
127 SAFE_FREE(rec.dptr);
128 return status;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136 TDB_DATA key,
137 struct ctdb_ltdb_header *header,
138 TDB_DATA data)
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
141 TDB_DATA rec;
142 int ret;
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
157 talloc_free(tmp_ctx);
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
168 of the record
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
171 TDB_DATA key,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA data)
175 size_t length;
176 struct ctdb_rec_data *d;
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181 if (d == NULL) {
182 return NULL;
184 d->length = length;
185 d->reqid = reqid;
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
188 if (header) {
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192 } else {
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
196 return d;
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
203 uint64_t db_id,
204 uint32_t reqid,
205 TDB_DATA key,
206 struct ctdb_ltdb_header *header,
207 TDB_DATA data)
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214 if (r == NULL) {
215 talloc_free(m);
216 return NULL;
219 if (m == NULL) {
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222 if (m == NULL) {
223 goto done;
225 m->db_id = db_id;
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
233 if (m2 == NULL) {
234 talloc_free(m);
235 goto done;
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
240 m2->count++;
242 done:
243 talloc_free(r);
244 return m2;
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
250 TDB_DATA data;
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
253 return data;
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263 uint32_t *reqid,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
267 if (r == NULL) {
268 r = (struct ctdb_rec_data *)&m->data[0];
269 } else {
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
273 if (reqid != NULL) {
274 *reqid = r->reqid;
277 if (key != NULL) {
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
281 if (data != NULL) {
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
292 return NULL;
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
297 return r;
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
303 int32_t status;
304 NTSTATUS ret;
305 TDB_DATA indata;
307 indata.dptr = (uint8_t *)&db_id;
308 indata.dsize = sizeof(db_id);
310 ret = ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312 indata, NULL, NULL, &status);
314 if (!NT_STATUS_IS_OK(ret)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316 return -1;
319 return status;
324 * CTDB transaction destructor
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
328 tdb_transaction_cancel(h->ctx->wtdb->tdb);
329 return 0;
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
339 struct db_record *rh;
340 struct db_ctdb_rec *crec;
341 TDB_DATA key;
342 TALLOC_CTX *tmp_ctx;
343 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344 int ret;
345 struct db_ctdb_ctx *ctx = h->ctx;
346 TDB_DATA data;
347 pid_t pid;
348 NTSTATUS status;
349 struct ctdb_ltdb_header header;
350 int32_t transaction_status;
352 key.dptr = (uint8_t *)discard_const(keyname);
353 key.dsize = strlen(keyname);
355 again:
356 tmp_ctx = talloc_new(h);
358 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
359 if (rh == NULL) {
360 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
361 talloc_free(tmp_ctx);
362 return -1;
364 crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
366 transaction_status = db_ctdb_transaction_active(ctx->db_id);
367 if (transaction_status == 1) {
368 unsigned long int usec = (1000 + random()) % 100000;
369 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370 "Re-trying after %lu microseconds...",
371 ctx->db_id, usec));
372 talloc_free(tmp_ctx);
373 usleep(usec);
374 goto again;
378 * store the pid in the database:
379 * it is not enought that the node is dmaster...
381 pid = getpid();
382 data.dptr = (unsigned char *)&pid;
383 data.dsize = sizeof(pid_t);
384 status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
385 if (!NT_STATUS_IS_OK(status)) {
386 DEBUG(0, (__location__ " Failed to store pid in transaction "
387 "record: %s\n", nt_errstr(status)));
388 talloc_free(tmp_ctx);
389 return -1;
392 talloc_free(rh);
394 ret = tdb_transaction_start(ctx->wtdb->tdb);
395 if (ret != 0) {
396 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
397 talloc_free(tmp_ctx);
398 return -1;
401 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
402 if (!NT_STATUS_IS_OK(status) || header.dmaster != get_my_vnn()) {
403 tdb_transaction_cancel(ctx->wtdb->tdb);
404 talloc_free(tmp_ctx);
405 goto again;
408 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
409 tdb_transaction_cancel(ctx->wtdb->tdb);
410 talloc_free(tmp_ctx);
411 goto again;
414 talloc_free(tmp_ctx);
416 return 0;
421 * CTDB dbwrap API: transaction_start function
422 * starts a transaction on a persistent database
424 static int db_ctdb_transaction_start(struct db_context *db)
426 struct db_ctdb_transaction_handle *h;
427 int ret;
428 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
429 struct db_ctdb_ctx);
431 if (!db->persistent) {
432 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
433 ctx->db_id));
434 return -1;
437 if (ctx->transaction) {
438 ctx->transaction->nesting++;
439 return 0;
442 h = talloc_zero(db, struct db_ctdb_transaction_handle);
443 if (h == NULL) {
444 DEBUG(0,(__location__ " oom for transaction handle\n"));
445 return -1;
448 h->ctx = ctx;
450 ret = db_ctdb_transaction_fetch_start(h);
451 if (ret != 0) {
452 talloc_free(h);
453 return -1;
456 talloc_set_destructor(h, db_ctdb_transaction_destructor);
458 ctx->transaction = h;
460 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
462 return 0;
468 fetch a record inside a transaction
470 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
471 TALLOC_CTX *mem_ctx,
472 TDB_DATA key, TDB_DATA *data)
474 struct db_ctdb_transaction_handle *h = db->transaction;
475 NTSTATUS status;
477 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
479 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
480 *data = tdb_null;
481 } else if (!NT_STATUS_IS_OK(status)) {
482 return -1;
485 if (!h->in_replay) {
486 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
487 if (h->m_all == NULL) {
488 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
489 data->dsize = 0;
490 talloc_free(data->dptr);
491 return -1;
495 return 0;
499 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
500 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
502 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
503 TALLOC_CTX *mem_ctx,
504 TDB_DATA key)
506 struct db_record *result;
507 TDB_DATA ctdb_data;
509 if (!(result = talloc(mem_ctx, struct db_record))) {
510 DEBUG(0, ("talloc failed\n"));
511 return NULL;
514 result->private_data = ctx->transaction;
516 result->key.dsize = key.dsize;
517 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
518 if (result->key.dptr == NULL) {
519 DEBUG(0, ("talloc failed\n"));
520 TALLOC_FREE(result);
521 return NULL;
524 result->store = db_ctdb_store_transaction;
525 result->delete_rec = db_ctdb_delete_transaction;
527 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
528 if (ctdb_data.dptr == NULL) {
529 /* create the record */
530 result->value = tdb_null;
531 return result;
534 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
535 result->value.dptr = NULL;
537 if ((result->value.dsize != 0)
538 && !(result->value.dptr = (uint8 *)talloc_memdup(
539 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
540 result->value.dsize))) {
541 DEBUG(0, ("talloc failed\n"));
542 TALLOC_FREE(result);
545 SAFE_FREE(ctdb_data.dptr);
547 return result;
550 static int db_ctdb_record_destructor(struct db_record **recp)
552 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
553 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
554 rec->private_data, struct db_ctdb_transaction_handle);
555 int ret = h->ctx->db->transaction_commit(h->ctx->db);
556 if (ret != 0) {
557 DEBUG(0,(__location__ " transaction_commit failed\n"));
559 return 0;
563 auto-create a transaction for persistent databases
565 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
566 TALLOC_CTX *mem_ctx,
567 TDB_DATA key)
569 int res;
570 struct db_record *rec, **recp;
572 res = db_ctdb_transaction_start(ctx->db);
573 if (res == -1) {
574 return NULL;
577 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
578 if (rec == NULL) {
579 ctx->db->transaction_cancel(ctx->db);
580 return NULL;
583 /* destroy this transaction when we release the lock */
584 recp = talloc(rec, struct db_record *);
585 if (recp == NULL) {
586 ctx->db->transaction_cancel(ctx->db);
587 talloc_free(rec);
588 return NULL;
590 *recp = rec;
591 talloc_set_destructor(recp, db_ctdb_record_destructor);
592 return rec;
597 stores a record inside a transaction
599 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
600 TDB_DATA key, TDB_DATA data)
602 TALLOC_CTX *tmp_ctx = talloc_new(h);
603 int ret;
604 TDB_DATA rec;
605 struct ctdb_ltdb_header header;
606 NTSTATUS status;
608 /* we need the header so we can update the RSN */
609 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
610 if (rec.dptr == NULL) {
611 /* the record doesn't exist - create one with us as dmaster.
612 This is only safe because we are in a transaction and this
613 is a persistent database */
614 ZERO_STRUCT(header);
615 } else {
616 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
617 rec.dsize -= sizeof(struct ctdb_ltdb_header);
618 /* a special case, we are writing the same data that is there now */
619 if (data.dsize == rec.dsize &&
620 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
621 SAFE_FREE(rec.dptr);
622 talloc_free(tmp_ctx);
623 return 0;
625 SAFE_FREE(rec.dptr);
628 header.dmaster = get_my_vnn();
629 header.rsn++;
631 if (!h->in_replay) {
632 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
633 if (h->m_all == NULL) {
634 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
635 talloc_free(tmp_ctx);
636 return -1;
640 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
641 if (h->m_write == NULL) {
642 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
643 talloc_free(tmp_ctx);
644 return -1;
647 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
648 if (NT_STATUS_IS_OK(status)) {
649 ret = 0;
650 } else {
651 ret = -1;
654 talloc_free(tmp_ctx);
656 return ret;
661 a record store inside a transaction
663 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
665 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
666 rec->private_data, struct db_ctdb_transaction_handle);
667 int ret;
669 ret = db_ctdb_transaction_store(h, rec->key, data);
670 if (ret != 0) {
671 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
673 return NT_STATUS_OK;
677 a record delete inside a transaction
679 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
681 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
682 rec->private_data, struct db_ctdb_transaction_handle);
683 int ret;
685 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
686 if (ret != 0) {
687 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
689 return NT_STATUS_OK;
694 replay a transaction
696 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
698 int ret, i;
699 struct ctdb_rec_data *rec = NULL;
701 h->in_replay = true;
702 talloc_free(h->m_write);
703 h->m_write = NULL;
705 ret = db_ctdb_transaction_fetch_start(h);
706 if (ret != 0) {
707 return ret;
710 for (i=0;i<h->m_all->count;i++) {
711 TDB_DATA key, data;
713 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
714 if (rec == NULL) {
715 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
716 goto failed;
719 if (rec->reqid == 0) {
720 /* its a store */
721 if (db_ctdb_transaction_store(h, key, data) != 0) {
722 goto failed;
724 } else {
725 TDB_DATA data2;
726 TALLOC_CTX *tmp_ctx = talloc_new(h);
728 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
729 talloc_free(tmp_ctx);
730 goto failed;
732 if (data2.dsize != data.dsize ||
733 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
734 /* the record has changed on us - we have to give up */
735 talloc_free(tmp_ctx);
736 goto failed;
738 talloc_free(tmp_ctx);
742 return 0;
744 failed:
745 tdb_transaction_cancel(h->ctx->wtdb->tdb);
746 return -1;
751 commit a transaction
753 static int db_ctdb_transaction_commit(struct db_context *db)
755 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
756 struct db_ctdb_ctx);
757 NTSTATUS rets;
758 int ret;
759 int status;
760 int retries = 0;
761 struct db_ctdb_transaction_handle *h = ctx->transaction;
762 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
764 if (h == NULL) {
765 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
766 return -1;
769 if (h->nested_cancel) {
770 db->transaction_cancel(db);
771 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
772 return -1;
775 if (h->nesting != 0) {
776 h->nesting--;
777 return 0;
780 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
782 talloc_set_destructor(h, NULL);
784 /* our commit strategy is quite complex.
786 - we first try to commit the changes to all other nodes
788 - if that works, then we commit locally and we are done
790 - if a commit on another node fails, then we need to cancel
791 the transaction, then restart the transaction (thus
792 opening a window of time for a pending recovery to
793 complete), then replay the transaction, checking all the
794 reads and writes (checking that reads give the same data,
795 and writes succeed). Then we retry the transaction to the
796 other nodes
799 again:
800 if (h->m_write == NULL) {
801 /* no changes were made, potentially after a retry */
802 tdb_transaction_cancel(h->ctx->wtdb->tdb);
803 talloc_free(h);
804 ctx->transaction = NULL;
805 return 0;
808 /* tell ctdbd to commit to the other nodes */
809 rets = ctdbd_control_local(messaging_ctdbd_connection(),
810 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
811 h->ctx->db_id, 0,
812 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
813 if (!NT_STATUS_IS_OK(rets) || status != 0) {
814 tdb_transaction_cancel(h->ctx->wtdb->tdb);
815 sleep(1);
817 if (!NT_STATUS_IS_OK(rets)) {
818 failure_control = CTDB_CONTROL_TRANS2_ERROR;
819 } else {
820 /* work out what error code we will give if we
821 have to fail the operation */
822 switch ((enum ctdb_trans2_commit_error)status) {
823 case CTDB_TRANS2_COMMIT_SUCCESS:
824 case CTDB_TRANS2_COMMIT_SOMEFAIL:
825 case CTDB_TRANS2_COMMIT_TIMEOUT:
826 failure_control = CTDB_CONTROL_TRANS2_ERROR;
827 break;
828 case CTDB_TRANS2_COMMIT_ALLFAIL:
829 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
830 break;
834 if (++retries == 5) {
835 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
836 h->ctx->db_id, retries, (unsigned)failure_control));
837 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
838 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
839 tdb_null, NULL, NULL, NULL);
840 h->ctx->transaction = NULL;
841 talloc_free(h);
842 ctx->transaction = NULL;
843 return -1;
846 if (ctdb_replay_transaction(h) != 0) {
847 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
848 (unsigned)failure_control));
849 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
850 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
851 tdb_null, NULL, NULL, NULL);
852 h->ctx->transaction = NULL;
853 talloc_free(h);
854 ctx->transaction = NULL;
855 return -1;
857 goto again;
858 } else {
859 failure_control = CTDB_CONTROL_TRANS2_ERROR;
862 /* do the real commit locally */
863 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
864 if (ret != 0) {
865 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
866 (unsigned)failure_control));
867 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
868 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
869 h->ctx->transaction = NULL;
870 talloc_free(h);
871 return ret;
874 /* tell ctdbd that we are finished with our local commit */
875 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
876 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
877 tdb_null, NULL, NULL, NULL);
878 h->ctx->transaction = NULL;
879 talloc_free(h);
880 return 0;
885 cancel a transaction
887 static int db_ctdb_transaction_cancel(struct db_context *db)
889 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
890 struct db_ctdb_ctx);
891 struct db_ctdb_transaction_handle *h = ctx->transaction;
893 if (h == NULL) {
894 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
895 return -1;
898 if (h->nesting != 0) {
899 h->nesting--;
900 h->nested_cancel = true;
901 return 0;
904 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
906 ctx->transaction = NULL;
907 talloc_free(h);
908 return 0;
912 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
914 struct db_ctdb_rec *crec = talloc_get_type_abort(
915 rec->private_data, struct db_ctdb_rec);
917 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
922 static NTSTATUS db_ctdb_delete(struct db_record *rec)
924 TDB_DATA data;
927 * We have to store the header with empty data. TODO: Fix the
928 * tdb-level cleanup
931 ZERO_STRUCT(data);
933 return db_ctdb_store(rec, data, 0);
937 static int db_ctdb_record_destr(struct db_record* data)
939 struct db_ctdb_rec *crec = talloc_get_type_abort(
940 data->private_data, struct db_ctdb_rec);
942 DEBUG(10, (DEBUGLEVEL > 10
943 ? "Unlocking db %u key %s\n"
944 : "Unlocking db %u key %.20s\n",
945 (int)crec->ctdb_ctx->db_id,
946 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
947 data->key.dsize)));
949 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
950 DEBUG(0, ("tdb_chainunlock failed\n"));
951 return -1;
954 return 0;
957 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
958 TALLOC_CTX *mem_ctx,
959 TDB_DATA key,
960 bool persistent)
962 struct db_record *result;
963 struct db_ctdb_rec *crec;
964 NTSTATUS status;
965 TDB_DATA ctdb_data;
966 int migrate_attempts = 0;
968 if (!(result = talloc(mem_ctx, struct db_record))) {
969 DEBUG(0, ("talloc failed\n"));
970 return NULL;
973 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
974 DEBUG(0, ("talloc failed\n"));
975 TALLOC_FREE(result);
976 return NULL;
979 result->private_data = (void *)crec;
980 crec->ctdb_ctx = ctx;
982 result->key.dsize = key.dsize;
983 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
984 if (result->key.dptr == NULL) {
985 DEBUG(0, ("talloc failed\n"));
986 TALLOC_FREE(result);
987 return NULL;
991 * Do a blocking lock on the record
993 again:
995 if (DEBUGLEVEL >= 10) {
996 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
997 DEBUG(10, (DEBUGLEVEL > 10
998 ? "Locking db %u key %s\n"
999 : "Locking db %u key %.20s\n",
1000 (int)crec->ctdb_ctx->db_id, keystr));
1001 TALLOC_FREE(keystr);
1004 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
1005 DEBUG(3, ("tdb_chainlock failed\n"));
1006 TALLOC_FREE(result);
1007 return NULL;
1010 result->store = db_ctdb_store;
1011 result->delete_rec = db_ctdb_delete;
1012 talloc_set_destructor(result, db_ctdb_record_destr);
1014 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1017 * See if we have a valid record and we are the dmaster. If so, we can
1018 * take the shortcut and just return it.
1021 if ((ctdb_data.dptr == NULL) ||
1022 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1023 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1024 #if 0
1025 || (random() % 2 != 0)
1026 #endif
1028 SAFE_FREE(ctdb_data.dptr);
1029 tdb_chainunlock(ctx->wtdb->tdb, key);
1030 talloc_set_destructor(result, NULL);
1032 migrate_attempts += 1;
1034 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1035 ctdb_data.dptr, ctdb_data.dptr ?
1036 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1037 get_my_vnn()));
1039 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1040 if (!NT_STATUS_IS_OK(status)) {
1041 DEBUG(5, ("ctdb_migrate failed: %s\n",
1042 nt_errstr(status)));
1043 TALLOC_FREE(result);
1044 return NULL;
1046 /* now its migrated, try again */
1047 goto again;
1050 if (migrate_attempts > 10) {
1051 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1052 migrate_attempts));
1055 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1057 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1058 result->value.dptr = NULL;
1060 if ((result->value.dsize != 0)
1061 && !(result->value.dptr = (uint8 *)talloc_memdup(
1062 result, ctdb_data.dptr + sizeof(crec->header),
1063 result->value.dsize))) {
1064 DEBUG(0, ("talloc failed\n"));
1065 TALLOC_FREE(result);
1068 SAFE_FREE(ctdb_data.dptr);
1070 return result;
1073 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1074 TALLOC_CTX *mem_ctx,
1075 TDB_DATA key)
1077 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1078 struct db_ctdb_ctx);
1080 if (ctx->transaction != NULL) {
1081 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1084 if (db->persistent) {
1085 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1088 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1092 fetch (unlocked, no migration) operation on ctdb
1094 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1095 TDB_DATA key, TDB_DATA *data)
1097 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1098 struct db_ctdb_ctx);
1099 NTSTATUS status;
1100 TDB_DATA ctdb_data;
1102 if (ctx->transaction) {
1103 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1106 /* try a direct fetch */
1107 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1110 * See if we have a valid record and we are the dmaster. If so, we can
1111 * take the shortcut and just return it.
1112 * we bypass the dmaster check for persistent databases
1114 if ((ctdb_data.dptr != NULL) &&
1115 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1116 (db->persistent ||
1117 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1118 /* we are the dmaster - avoid the ctdb protocol op */
1120 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1121 if (data->dsize == 0) {
1122 SAFE_FREE(ctdb_data.dptr);
1123 data->dptr = NULL;
1124 return 0;
1127 data->dptr = (uint8 *)talloc_memdup(
1128 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1129 data->dsize);
1131 SAFE_FREE(ctdb_data.dptr);
1133 if (data->dptr == NULL) {
1134 return -1;
1136 return 0;
1139 SAFE_FREE(ctdb_data.dptr);
1141 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1142 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1143 if (!NT_STATUS_IS_OK(status)) {
1144 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1145 return -1;
1148 return 0;
1151 struct traverse_state {
1152 struct db_context *db;
1153 int (*fn)(struct db_record *rec, void *private_data);
1154 void *private_data;
1157 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1159 struct traverse_state *state = (struct traverse_state *)private_data;
1160 struct db_record *rec;
1161 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1162 /* we have to give them a locked record to prevent races */
1163 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1164 if (rec && rec->value.dsize > 0) {
1165 state->fn(rec, state->private_data);
1167 talloc_free(tmp_ctx);
1170 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1171 void *private_data)
1173 struct traverse_state *state = (struct traverse_state *)private_data;
1174 struct db_record *rec;
1175 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1176 int ret = 0;
1177 /* we have to give them a locked record to prevent races */
1178 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1179 if (rec && rec->value.dsize > 0) {
1180 ret = state->fn(rec, state->private_data);
1182 talloc_free(tmp_ctx);
1183 return ret;
1186 static int db_ctdb_traverse(struct db_context *db,
1187 int (*fn)(struct db_record *rec,
1188 void *private_data),
1189 void *private_data)
1191 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1192 struct db_ctdb_ctx);
1193 struct traverse_state state;
1195 state.db = db;
1196 state.fn = fn;
1197 state.private_data = private_data;
1199 if (db->persistent) {
1200 /* for persistent databases we don't need to do a ctdb traverse,
1201 we can do a faster local traverse */
1202 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1206 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1207 return 0;
1210 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1212 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1215 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1217 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1220 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1222 struct traverse_state *state = (struct traverse_state *)private_data;
1223 struct db_record rec;
1224 rec.key = key;
1225 rec.value = data;
1226 rec.store = db_ctdb_store_deny;
1227 rec.delete_rec = db_ctdb_delete_deny;
1228 rec.private_data = state->db;
1229 state->fn(&rec, state->private_data);
1232 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1233 void *private_data)
1235 struct traverse_state *state = (struct traverse_state *)private_data;
1236 struct db_record rec;
1237 rec.key = kbuf;
1238 rec.value = dbuf;
1239 rec.store = db_ctdb_store_deny;
1240 rec.delete_rec = db_ctdb_delete_deny;
1241 rec.private_data = state->db;
1243 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1244 /* a deleted record */
1245 return 0;
1247 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1248 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1250 return state->fn(&rec, state->private_data);
1253 static int db_ctdb_traverse_read(struct db_context *db,
1254 int (*fn)(struct db_record *rec,
1255 void *private_data),
1256 void *private_data)
1258 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1259 struct db_ctdb_ctx);
1260 struct traverse_state state;
1262 state.db = db;
1263 state.fn = fn;
1264 state.private_data = private_data;
1266 if (db->persistent) {
1267 /* for persistent databases we don't need to do a ctdb traverse,
1268 we can do a faster local traverse */
1269 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1272 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1273 return 0;
1276 static int db_ctdb_get_seqnum(struct db_context *db)
1278 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1279 struct db_ctdb_ctx);
1280 return tdb_get_seqnum(ctx->wtdb->tdb);
1283 static int db_ctdb_get_flags(struct db_context *db)
1285 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1286 struct db_ctdb_ctx);
1287 return tdb_get_flags(ctx->wtdb->tdb);
1290 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1291 const char *name,
1292 int hash_size, int tdb_flags,
1293 int open_flags, mode_t mode)
1295 struct db_context *result;
1296 struct db_ctdb_ctx *db_ctdb;
1297 char *db_path;
1299 if (!lp_clustering()) {
1300 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1301 return NULL;
1304 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1305 DEBUG(0, ("talloc failed\n"));
1306 TALLOC_FREE(result);
1307 return NULL;
1310 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1311 DEBUG(0, ("talloc failed\n"));
1312 TALLOC_FREE(result);
1313 return NULL;
1316 db_ctdb->transaction = NULL;
1317 db_ctdb->db = result;
1319 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1320 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1321 TALLOC_FREE(result);
1322 return NULL;
1325 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1327 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1329 /* only pass through specific flags */
1330 tdb_flags &= TDB_SEQNUM;
1332 /* honor permissions if user has specified O_CREAT */
1333 if (open_flags & O_CREAT) {
1334 chmod(db_path, mode);
1337 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1338 if (db_ctdb->wtdb == NULL) {
1339 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1340 TALLOC_FREE(result);
1341 return NULL;
1343 talloc_free(db_path);
1345 result->private_data = (void *)db_ctdb;
1346 result->fetch_locked = db_ctdb_fetch_locked;
1347 result->fetch = db_ctdb_fetch;
1348 result->traverse = db_ctdb_traverse;
1349 result->traverse_read = db_ctdb_traverse_read;
1350 result->get_seqnum = db_ctdb_get_seqnum;
1351 result->get_flags = db_ctdb_get_flags;
1352 result->transaction_start = db_ctdb_transaction_start;
1353 result->transaction_commit = db_ctdb_transaction_commit;
1354 result->transaction_cancel = db_ctdb_transaction_cancel;
1356 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1357 name, db_ctdb->db_id));
1359 return result;
1361 #endif