allow nested ctdb transactions in the same manner that they are
[Samba.git] / source / lib / dbwrap_ctdb.c
blob7c1ef8fed86d2aeaf7db3ef4ccaf4c579ab2055e
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
34 uint32_t nesting;
35 bool nested_cancel;
38 struct db_ctdb_ctx {
39 struct db_context *db;
40 struct tdb_wrap *wtdb;
41 uint32 db_id;
42 struct db_ctdb_transaction_handle *transaction;
45 struct db_ctdb_rec {
46 struct db_ctdb_ctx *ctdb_ctx;
47 struct ctdb_ltdb_header header;
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51 TALLOC_CTX *mem_ctx,
52 TDB_DATA key,
53 bool persistent);
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
57 NTSTATUS status;
58 enum TDB_ERROR tret = tdb_error(tdb);
60 switch (tret) {
61 case TDB_ERR_EXISTS:
62 status = NT_STATUS_OBJECT_NAME_COLLISION;
63 break;
64 case TDB_ERR_NOEXIST:
65 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66 break;
67 default:
68 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69 break;
72 return status;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
81 of the record
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
84 TDB_DATA key,
85 struct ctdb_ltdb_header *header,
86 TDB_DATA data)
88 size_t length;
89 struct ctdb_rec_data *d;
91 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
92 data.dsize + (header?sizeof(*header):0);
93 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94 if (d == NULL) {
95 return NULL;
97 d->length = length;
98 d->reqid = reqid;
99 d->keylen = key.dsize;
100 memcpy(&d->data[0], key.dptr, key.dsize);
101 if (header) {
102 d->datalen = data.dsize + sizeof(*header);
103 memcpy(&d->data[key.dsize], header, sizeof(*header));
104 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105 } else {
106 d->datalen = data.dsize;
107 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
109 return d;
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
115 struct ctdb_marshall_buffer *m,
116 uint64_t db_id,
117 uint32_t reqid,
118 TDB_DATA key,
119 struct ctdb_ltdb_header *header,
120 TDB_DATA data)
122 struct ctdb_rec_data *r;
123 size_t m_size, r_size;
124 struct ctdb_marshall_buffer *m2;
126 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
127 if (r == NULL) {
128 talloc_free(m);
129 return NULL;
132 if (m == NULL) {
133 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
134 if (m == NULL) {
135 return NULL;
137 m->db_id = db_id;
140 m_size = talloc_get_size(m);
141 r_size = talloc_get_size(r);
143 m2 = talloc_realloc_size(mem_ctx, m, m_size + r_size);
144 if (m2 == NULL) {
145 talloc_free(m);
146 return NULL;
149 memcpy(m_size + (uint8_t *)m2, r, r_size);
151 talloc_free(r);
153 m2->count++;
155 return m2;
158 /* we've finished marshalling, return a data blob with the marshalled records */
159 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
161 TDB_DATA data;
162 data.dptr = (uint8_t *)m;
163 data.dsize = talloc_get_size(m);
164 return data;
168 loop over a marshalling buffer
170 - pass r==NULL to start
171 - loop the number of times indicated by m->count
173 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
174 uint32_t *reqid,
175 struct ctdb_ltdb_header *header,
176 TDB_DATA *key, TDB_DATA *data)
178 if (r == NULL) {
179 r = (struct ctdb_rec_data *)&m->data[0];
180 } else {
181 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
184 if (reqid != NULL) {
185 *reqid = r->reqid;
188 if (key != NULL) {
189 key->dptr = &r->data[0];
190 key->dsize = r->keylen;
192 if (data != NULL) {
193 data->dptr = &r->data[r->keylen];
194 data->dsize = r->datalen;
195 if (header != NULL) {
196 data->dptr += sizeof(*header);
197 data->dsize -= sizeof(*header);
201 if (header != NULL) {
202 if (r->datalen < sizeof(*header)) {
203 return NULL;
205 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
208 return r;
213 /* start a transaction on a database */
214 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
216 tdb_transaction_cancel(h->ctx->wtdb->tdb);
217 return 0;
220 /* start a transaction on a database */
221 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
223 struct db_record *rh;
224 TDB_DATA key;
225 TALLOC_CTX *tmp_ctx;
226 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
227 int ret;
228 struct db_ctdb_ctx *ctx = h->ctx;
229 TDB_DATA data;
231 key.dptr = discard_const(keyname);
232 key.dsize = strlen(keyname);
234 again:
235 tmp_ctx = talloc_new(h);
237 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
238 if (rh == NULL) {
239 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
240 talloc_free(tmp_ctx);
241 return -1;
243 talloc_free(rh);
245 ret = tdb_transaction_start(ctx->wtdb->tdb);
246 if (ret != 0) {
247 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
248 talloc_free(tmp_ctx);
249 return -1;
252 data = tdb_fetch(ctx->wtdb->tdb, key);
253 if ((data.dptr == NULL) ||
254 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
255 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
256 SAFE_FREE(data.dptr);
257 tdb_transaction_cancel(ctx->wtdb->tdb);
258 talloc_free(tmp_ctx);
259 goto again;
262 SAFE_FREE(data.dptr);
263 talloc_free(tmp_ctx);
265 return 0;
269 /* start a transaction on a database */
270 static int db_ctdb_transaction_start(struct db_context *db)
272 struct db_ctdb_transaction_handle *h;
273 int ret;
274 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
275 struct db_ctdb_ctx);
277 if (!db->persistent) {
278 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
279 ctx->db_id));
280 return -1;
283 if (ctx->transaction) {
284 ctx->transaction->nesting++;
285 return 0;
288 h = talloc_zero(db, struct db_ctdb_transaction_handle);
289 if (h == NULL) {
290 DEBUG(0,(__location__ " oom for transaction handle\n"));
291 return -1;
294 h->ctx = ctx;
296 ret = db_ctdb_transaction_fetch_start(h);
297 if (ret != 0) {
298 talloc_free(h);
299 return -1;
302 talloc_set_destructor(h, db_ctdb_transaction_destructor);
304 ctx->transaction = h;
306 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
308 return 0;
314 fetch a record inside a transaction
316 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
317 TALLOC_CTX *mem_ctx,
318 TDB_DATA key, TDB_DATA *data)
320 struct db_ctdb_transaction_handle *h = db->transaction;
322 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
324 if (data->dptr != NULL) {
325 uint8_t *oldptr = (uint8_t *)data->dptr;
326 data->dsize -= sizeof(struct ctdb_ltdb_header);
327 if (data->dsize == 0) {
328 data->dptr = NULL;
329 } else {
330 data->dptr = (uint8 *)
331 talloc_memdup(
332 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
333 data->dsize);
335 SAFE_FREE(oldptr);
336 if (data->dptr == NULL && data->dsize != 0) {
337 return -1;
341 if (!h->in_replay) {
342 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
343 if (h->m_all == NULL) {
344 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
345 data->dsize = 0;
346 talloc_free(data->dptr);
347 return -1;
351 return 0;
355 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
356 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
358 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
359 TALLOC_CTX *mem_ctx,
360 TDB_DATA key)
362 struct db_record *result;
363 TDB_DATA ctdb_data;
365 if (!(result = talloc(mem_ctx, struct db_record))) {
366 DEBUG(0, ("talloc failed\n"));
367 return NULL;
370 result->private_data = ctx->transaction;
372 result->key.dsize = key.dsize;
373 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
374 if (result->key.dptr == NULL) {
375 DEBUG(0, ("talloc failed\n"));
376 TALLOC_FREE(result);
377 return NULL;
380 result->store = db_ctdb_store_transaction;
381 result->delete_rec = db_ctdb_delete_transaction;
383 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
384 if (ctdb_data.dptr == NULL) {
385 /* create the record */
386 result->value = tdb_null;
387 return result;
390 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
391 result->value.dptr = NULL;
393 if ((result->value.dsize != 0)
394 && !(result->value.dptr = (uint8 *)talloc_memdup(
395 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
396 result->value.dsize))) {
397 DEBUG(0, ("talloc failed\n"));
398 TALLOC_FREE(result);
401 SAFE_FREE(ctdb_data.dptr);
403 return result;
406 static int db_ctdb_record_destructor(struct db_record *rec)
408 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
409 rec->private_data, struct db_ctdb_transaction_handle);
410 int ret = h->ctx->db->transaction_commit(h->ctx->db);
411 if (ret != 0) {
412 DEBUG(0,(__location__ " transaction_commit failed\n"));
414 return 0;
418 auto-create a transaction for persistent databases
420 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
421 TALLOC_CTX *mem_ctx,
422 TDB_DATA key)
424 int res;
425 struct db_record *rec;
427 res = db_ctdb_transaction_start(ctx->db);
428 if (res == -1) {
429 return NULL;
432 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
433 if (rec == NULL) {
434 ctx->db->transaction_cancel(ctx->db);
435 return NULL;
438 /* destroy this transaction when we release the lock */
439 talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
440 return rec;
445 stores a record inside a transaction
447 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
448 TDB_DATA key, TDB_DATA data)
450 TALLOC_CTX *tmp_ctx = talloc_new(h);
451 int ret;
452 TDB_DATA rec;
453 struct ctdb_ltdb_header header;
455 /* we need the header so we can update the RSN */
456 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
457 if (rec.dptr == NULL) {
458 /* the record doesn't exist - create one with us as dmaster.
459 This is only safe because we are in a transaction and this
460 is a persistent database */
461 ZERO_STRUCT(header);
462 header.dmaster = get_my_vnn();
463 } else {
464 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
465 rec.dsize -= sizeof(struct ctdb_ltdb_header);
466 /* a special case, we are writing the same data that is there now */
467 if (data.dsize == rec.dsize &&
468 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
469 SAFE_FREE(rec.dptr);
470 talloc_free(tmp_ctx);
471 return 0;
473 SAFE_FREE(rec.dptr);
476 header.rsn++;
478 if (!h->in_replay) {
479 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
480 if (h->m_all == NULL) {
481 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
482 talloc_free(tmp_ctx);
483 return -1;
487 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
488 if (h->m_write == NULL) {
489 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
490 talloc_free(tmp_ctx);
491 return -1;
494 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
495 rec.dptr = talloc_size(tmp_ctx, rec.dsize);
496 if (rec.dptr == NULL) {
497 DEBUG(0,(__location__ " Failed to alloc record\n"));
498 talloc_free(tmp_ctx);
499 return -1;
501 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
502 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
504 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
506 talloc_free(tmp_ctx);
508 return ret;
513 a record store inside a transaction
515 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
517 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
518 rec->private_data, struct db_ctdb_transaction_handle);
519 int ret;
521 ret = db_ctdb_transaction_store(h, rec->key, data);
522 if (ret != 0) {
523 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
525 return NT_STATUS_OK;
529 a record delete inside a transaction
531 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
533 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
534 rec->private_data, struct db_ctdb_transaction_handle);
535 int ret;
537 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
538 if (ret != 0) {
539 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
541 return NT_STATUS_OK;
546 replay a transaction
548 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
550 int ret, i;
551 struct ctdb_rec_data *rec = NULL;
553 h->in_replay = true;
554 talloc_free(h->m_write);
555 h->m_write = NULL;
557 ret = db_ctdb_transaction_fetch_start(h);
558 if (ret != 0) {
559 return ret;
562 for (i=0;i<h->m_all->count;i++) {
563 TDB_DATA key, data;
565 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
566 if (rec == NULL) {
567 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
568 goto failed;
571 if (rec->reqid == 0) {
572 /* its a store */
573 if (db_ctdb_transaction_store(h, key, data) != 0) {
574 goto failed;
576 } else {
577 TDB_DATA data2;
578 TALLOC_CTX *tmp_ctx = talloc_new(h);
580 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
581 talloc_free(tmp_ctx);
582 goto failed;
584 if (data2.dsize != data.dsize ||
585 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
586 /* the record has changed on us - we have to give up */
587 talloc_free(tmp_ctx);
588 goto failed;
590 talloc_free(tmp_ctx);
594 return 0;
596 failed:
597 tdb_transaction_cancel(h->ctx->wtdb->tdb);
598 return -1;
603 commit a transaction
605 static int db_ctdb_transaction_commit(struct db_context *db)
607 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
608 struct db_ctdb_ctx);
609 NTSTATUS rets;
610 int ret;
611 int status;
612 int retries = 0;
613 struct db_ctdb_transaction_handle *h = ctx->transaction;
614 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
616 if (h == NULL) {
617 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
618 return -1;
621 if (h->nested_cancel) {
622 db->transaction_cancel(db);
623 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
624 return -1;
627 if (h->nesting != 0) {
628 h->nesting--;
629 return 0;
632 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
634 talloc_set_destructor(h, NULL);
636 /* our commit strategy is quite complex.
638 - we first try to commit the changes to all other nodes
640 - if that works, then we commit locally and we are done
642 - if a commit on another node fails, then we need to cancel
643 the transaction, then restart the transaction (thus
644 opening a window of time for a pending recovery to
645 complete), then replay the transaction, checking all the
646 reads and writes (checking that reads give the same data,
647 and writes succeed). Then we retry the transaction to the
648 other nodes
651 again:
652 if (h->m_write == NULL) {
653 /* no changes were made, potentially after a retry */
654 tdb_transaction_cancel(h->ctx->wtdb->tdb);
655 talloc_free(h);
656 ctx->transaction = NULL;
657 return 0;
660 /* tell ctdbd to commit to the other nodes */
661 rets = ctdbd_control_local(messaging_ctdbd_connection(),
662 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
663 h->ctx->db_id, 0,
664 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
665 if (!NT_STATUS_IS_OK(rets) || status != 0) {
666 tdb_transaction_cancel(h->ctx->wtdb->tdb);
667 sleep(1);
669 if (!NT_STATUS_IS_OK(rets)) {
670 failure_control = CTDB_CONTROL_TRANS2_ERROR;
671 } else {
672 /* work out what error code we will give if we
673 have to fail the operation */
674 switch ((enum ctdb_trans2_commit_error)status) {
675 case CTDB_TRANS2_COMMIT_SUCCESS:
676 case CTDB_TRANS2_COMMIT_SOMEFAIL:
677 case CTDB_TRANS2_COMMIT_TIMEOUT:
678 failure_control = CTDB_CONTROL_TRANS2_ERROR;
679 break;
680 case CTDB_TRANS2_COMMIT_ALLFAIL:
681 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
682 break;
686 if (++retries == 5) {
687 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
688 h->ctx->db_id, retries, (unsigned)failure_control));
689 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
690 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
691 tdb_null, NULL, NULL, NULL);
692 h->ctx->transaction = NULL;
693 talloc_free(h);
694 ctx->transaction = NULL;
695 return -1;
698 if (ctdb_replay_transaction(h) != 0) {
699 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
700 (unsigned)failure_control));
701 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
702 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
703 tdb_null, NULL, NULL, NULL);
704 h->ctx->transaction = NULL;
705 talloc_free(h);
706 ctx->transaction = NULL;
707 return -1;
709 goto again;
710 } else {
711 failure_control = CTDB_CONTROL_TRANS2_ERROR;
714 /* do the real commit locally */
715 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
716 if (ret != 0) {
717 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
718 (unsigned)failure_control));
719 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
720 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
721 h->ctx->transaction = NULL;
722 talloc_free(h);
723 return ret;
726 /* tell ctdbd that we are finished with our local commit */
727 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
728 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
729 tdb_null, NULL, NULL, NULL);
730 h->ctx->transaction = NULL;
731 talloc_free(h);
732 return 0;
737 cancel a transaction
739 static int db_ctdb_transaction_cancel(struct db_context *db)
741 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
742 struct db_ctdb_ctx);
743 struct db_ctdb_transaction_handle *h = ctx->transaction;
745 if (h == NULL) {
746 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
747 return -1;
750 if (h->nesting != 0) {
751 h->nesting--;
752 h->nested_cancel = true;
753 return 0;
756 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
758 ctx->transaction = NULL;
759 talloc_free(h);
760 return 0;
764 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
766 struct db_ctdb_rec *crec = talloc_get_type_abort(
767 rec->private_data, struct db_ctdb_rec);
768 TDB_DATA cdata;
769 int ret;
771 cdata.dsize = sizeof(crec->header) + data.dsize;
773 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
774 return NT_STATUS_NO_MEMORY;
777 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
778 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
780 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
782 SAFE_FREE(cdata.dptr);
784 return (ret == 0) ? NT_STATUS_OK
785 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
790 static NTSTATUS db_ctdb_delete(struct db_record *rec)
792 TDB_DATA data;
795 * We have to store the header with empty data. TODO: Fix the
796 * tdb-level cleanup
799 ZERO_STRUCT(data);
801 return db_ctdb_store(rec, data, 0);
805 static int db_ctdb_record_destr(struct db_record* data)
807 struct db_ctdb_rec *crec = talloc_get_type_abort(
808 data->private_data, struct db_ctdb_rec);
810 DEBUG(10, (DEBUGLEVEL > 10
811 ? "Unlocking db %u key %s\n"
812 : "Unlocking db %u key %.20s\n",
813 (int)crec->ctdb_ctx->db_id,
814 hex_encode(data, (unsigned char *)data->key.dptr,
815 data->key.dsize)));
817 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
818 DEBUG(0, ("tdb_chainunlock failed\n"));
819 return -1;
822 return 0;
825 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
826 TALLOC_CTX *mem_ctx,
827 TDB_DATA key,
828 bool persistent)
830 struct db_record *result;
831 struct db_ctdb_rec *crec;
832 NTSTATUS status;
833 TDB_DATA ctdb_data;
834 int migrate_attempts = 0;
836 if (!(result = talloc(mem_ctx, struct db_record))) {
837 DEBUG(0, ("talloc failed\n"));
838 return NULL;
841 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
842 DEBUG(0, ("talloc failed\n"));
843 TALLOC_FREE(result);
844 return NULL;
847 result->private_data = (void *)crec;
848 crec->ctdb_ctx = ctx;
850 result->key.dsize = key.dsize;
851 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
852 if (result->key.dptr == NULL) {
853 DEBUG(0, ("talloc failed\n"));
854 TALLOC_FREE(result);
855 return NULL;
859 * Do a blocking lock on the record
861 again:
863 if (DEBUGLEVEL >= 10) {
864 char *keystr = hex_encode(result, key.dptr, key.dsize);
865 DEBUG(10, (DEBUGLEVEL > 10
866 ? "Locking db %u key %s\n"
867 : "Locking db %u key %.20s\n",
868 (int)crec->ctdb_ctx->db_id, keystr));
869 TALLOC_FREE(keystr);
872 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
873 DEBUG(3, ("tdb_chainlock failed\n"));
874 TALLOC_FREE(result);
875 return NULL;
878 result->store = db_ctdb_store;
879 result->delete_rec = db_ctdb_delete;
880 talloc_set_destructor(result, db_ctdb_record_destr);
882 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
885 * See if we have a valid record and we are the dmaster. If so, we can
886 * take the shortcut and just return it.
889 if ((ctdb_data.dptr == NULL) ||
890 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
891 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
892 #if 0
893 || (random() % 2 != 0)
894 #endif
896 SAFE_FREE(ctdb_data.dptr);
897 tdb_chainunlock(ctx->wtdb->tdb, key);
898 talloc_set_destructor(result, NULL);
900 migrate_attempts += 1;
902 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
903 ctdb_data.dptr, ctdb_data.dptr ?
904 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
905 get_my_vnn()));
907 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
908 if (!NT_STATUS_IS_OK(status)) {
909 DEBUG(5, ("ctdb_migrate failed: %s\n",
910 nt_errstr(status)));
911 TALLOC_FREE(result);
912 return NULL;
914 /* now its migrated, try again */
915 goto again;
918 if (migrate_attempts > 10) {
919 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
920 migrate_attempts));
923 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
925 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
926 result->value.dptr = NULL;
928 if ((result->value.dsize != 0)
929 && !(result->value.dptr = (uint8 *)talloc_memdup(
930 result, ctdb_data.dptr + sizeof(crec->header),
931 result->value.dsize))) {
932 DEBUG(0, ("talloc failed\n"));
933 TALLOC_FREE(result);
936 SAFE_FREE(ctdb_data.dptr);
938 return result;
941 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
942 TALLOC_CTX *mem_ctx,
943 TDB_DATA key)
945 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
946 struct db_ctdb_ctx);
948 if (ctx->transaction != NULL) {
949 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
952 if (db->persistent) {
953 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
956 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
960 fetch (unlocked, no migration) operation on ctdb
962 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
963 TDB_DATA key, TDB_DATA *data)
965 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
966 struct db_ctdb_ctx);
967 NTSTATUS status;
968 TDB_DATA ctdb_data;
970 if (ctx->transaction) {
971 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
974 /* try a direct fetch */
975 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
978 * See if we have a valid record and we are the dmaster. If so, we can
979 * take the shortcut and just return it.
980 * we bypass the dmaster check for persistent databases
982 if ((ctdb_data.dptr != NULL) &&
983 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
984 (db->persistent ||
985 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
986 /* we are the dmaster - avoid the ctdb protocol op */
988 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
989 if (data->dsize == 0) {
990 SAFE_FREE(ctdb_data.dptr);
991 data->dptr = NULL;
992 return 0;
995 data->dptr = (uint8 *)talloc_memdup(
996 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
997 data->dsize);
999 SAFE_FREE(ctdb_data.dptr);
1001 if (data->dptr == NULL) {
1002 return -1;
1004 return 0;
1007 SAFE_FREE(ctdb_data.dptr);
1009 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1010 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1011 if (!NT_STATUS_IS_OK(status)) {
1012 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1013 return -1;
1016 return 0;
1019 struct traverse_state {
1020 struct db_context *db;
1021 int (*fn)(struct db_record *rec, void *private_data);
1022 void *private_data;
1025 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1027 struct traverse_state *state = (struct traverse_state *)private_data;
1028 struct db_record *rec;
1029 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1030 /* we have to give them a locked record to prevent races */
1031 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1032 if (rec && rec->value.dsize > 0) {
1033 state->fn(rec, state->private_data);
1035 talloc_free(tmp_ctx);
1038 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1039 void *private_data)
1041 struct traverse_state *state = (struct traverse_state *)private_data;
1042 struct db_record *rec;
1043 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1044 int ret = 0;
1045 /* we have to give them a locked record to prevent races */
1046 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1047 if (rec && rec->value.dsize > 0) {
1048 ret = state->fn(rec, state->private_data);
1050 talloc_free(tmp_ctx);
1051 return ret;
1054 static int db_ctdb_traverse(struct db_context *db,
1055 int (*fn)(struct db_record *rec,
1056 void *private_data),
1057 void *private_data)
1059 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1060 struct db_ctdb_ctx);
1061 struct traverse_state state;
1063 state.db = db;
1064 state.fn = fn;
1065 state.private_data = private_data;
1067 if (db->persistent) {
1068 /* for persistent databases we don't need to do a ctdb traverse,
1069 we can do a faster local traverse */
1070 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1074 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1075 return 0;
1078 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1080 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1083 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1085 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1088 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1090 struct traverse_state *state = (struct traverse_state *)private_data;
1091 struct db_record rec;
1092 rec.key = key;
1093 rec.value = data;
1094 rec.store = db_ctdb_store_deny;
1095 rec.delete_rec = db_ctdb_delete_deny;
1096 rec.private_data = state->db;
1097 state->fn(&rec, state->private_data);
1100 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1101 void *private_data)
1103 struct traverse_state *state = (struct traverse_state *)private_data;
1104 struct db_record rec;
1105 rec.key = kbuf;
1106 rec.value = dbuf;
1107 rec.store = db_ctdb_store_deny;
1108 rec.delete_rec = db_ctdb_delete_deny;
1109 rec.private_data = state->db;
1111 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1112 /* a deleted record */
1113 return 0;
1115 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1116 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1118 return state->fn(&rec, state->private_data);
1121 static int db_ctdb_traverse_read(struct db_context *db,
1122 int (*fn)(struct db_record *rec,
1123 void *private_data),
1124 void *private_data)
1126 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1127 struct db_ctdb_ctx);
1128 struct traverse_state state;
1130 state.db = db;
1131 state.fn = fn;
1132 state.private_data = private_data;
1134 if (db->persistent) {
1135 /* for persistent databases we don't need to do a ctdb traverse,
1136 we can do a faster local traverse */
1137 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1140 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1141 return 0;
1144 static int db_ctdb_get_seqnum(struct db_context *db)
1146 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1147 struct db_ctdb_ctx);
1148 return tdb_get_seqnum(ctx->wtdb->tdb);
1151 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1152 const char *name,
1153 int hash_size, int tdb_flags,
1154 int open_flags, mode_t mode)
1156 struct db_context *result;
1157 struct db_ctdb_ctx *db_ctdb;
1158 char *db_path;
1160 if (!lp_clustering()) {
1161 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1162 return NULL;
1165 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1166 DEBUG(0, ("talloc failed\n"));
1167 TALLOC_FREE(result);
1168 return NULL;
1171 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1172 DEBUG(0, ("talloc failed\n"));
1173 TALLOC_FREE(result);
1174 return NULL;
1177 db_ctdb->transaction = NULL;
1178 db_ctdb->db = result;
1180 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1181 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1182 TALLOC_FREE(result);
1183 return NULL;
1186 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1188 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1190 /* only pass through specific flags */
1191 tdb_flags &= TDB_SEQNUM;
1193 /* honor permissions if user has specified O_CREAT */
1194 if (open_flags & O_CREAT) {
1195 chmod(db_path, mode);
1198 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1199 if (db_ctdb->wtdb == NULL) {
1200 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1201 TALLOC_FREE(result);
1202 return NULL;
1204 talloc_free(db_path);
1206 result->private_data = (void *)db_ctdb;
1207 result->fetch_locked = db_ctdb_fetch_locked;
1208 result->fetch = db_ctdb_fetch;
1209 result->traverse = db_ctdb_traverse;
1210 result->traverse_read = db_ctdb_traverse_read;
1211 result->get_seqnum = db_ctdb_get_seqnum;
1212 result->transaction_start = db_ctdb_transaction_start;
1213 result->transaction_commit = db_ctdb_transaction_commit;
1214 result->transaction_cancel = db_ctdb_transaction_cancel;
1216 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1217 name, db_ctdb->db_id));
1219 return result;
1221 #endif